Source code for ccpn.core.lib.PeakPickers.PeakPickerNd

"""
Module Documentation here
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See https://ccpn.ac.uk/software/licensing/")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2022-03-04 15:43:14 +0000 (Fri, March 04, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: Ed Brooksbank $"
__date__ = "$Date: 2021-02-08 11:42:15 +0000 (Mon, February 08, 2021) $"
#=========================================================================================
# Start of code
#=========================================================================================

import math
import numpy as np
from typing import Sequence
from collections import Counter
from ccpn.core.lib.PeakPickers.PeakPickerABC import PeakPickerABC, SimplePeak
from ccpn.util.traits.CcpNmrTraits import CFloat, CInt, CBool, CList
from ccpn.util.Logging import getLogger
from ccpnc.peak import Peak as CPeak


GAUSSIANMETHOD = 'gaussian'
LORENTZIANMETHOD = 'lorentzian'
PARABOLICMETHOD = 'parabolic'
PICKINGMETHODS = (GAUSSIANMETHOD, LORENTZIANMETHOD, PARABOLICMETHOD)


[docs]class PeakPickerNd(PeakPickerABC): """A simple Nd peak picker for testing """ peakPickerType = "PeakPickerNd" onlyFor1D = False # list of peakPicker attributes that need to be stored/restored # these pertain to a particular peakPicker subclass # this cannot contain items that are not JSON serialisable noise = CFloat(allow_none=True, default_value=None) minimumLineWidth = CList(allow_none=True, default_value=[]) checkAllAdjacent = CBool(allow_none=True, default_value=True) singularMode = CBool(allow_none=True, default_value=True) halfBoxFindPeaksWidth = CInt(allow_none=True, default_value=2) halfBoxSearchWidth = CInt(allow_none=True, default_value=3) halfBoxFitWidth = CInt(allow_none=True, default_value=3) searchBoxDoFit = CBool(allow_none=True, default_value=True) setLineWidths = CBool(allow_none=True, default_value=True) searchBoxMode = CBool(allow_none=True, default_value=True) def __init__(self, spectrum): super().__init__(spectrum=spectrum) self.positiveThreshold = spectrum.positiveContourBase if spectrum.includePositiveContours else None self.negativeThreshold = spectrum.negativeContourBase if spectrum.includeNegativeContours else None self.fitMethod = PARABOLICMETHOD self._hbsWidth = None self._hbfWidth = None self.findFunc = None
[docs] def findPeaks(self, data) -> list: """find the peaks in data (type numpy-array) and return as a list of SimplePeak instances note that SimplePeak.points are ordered z,y,x for nD, in accordance with the numpy nD data array called from the pickPeaks() method any required parameters that findPeaks method needs should be initialised/set before using the setParameters() method; i.e.: myPeakPicker = PeakPicker(spectrum=mySpectrum) myPeakPicker.setParameters(dropFactor=0.2, positiveThreshold=1e6, negativeThreshold=None) corePeaks = myPeakPicker.pickPeaks(axisDict={'H':(6.0,11.5),'N':(102.3,130.0)}, spectrum.peaklists[-1]) :param data: nD numpy array :return list of SimplePeak instances """ # find the list of peaks in the region allPeaksArray, allRegionArrays, regionArray, _ = self._findPeaks(data, self.positiveThreshold, self.negativeThreshold) # if peaks exist then create a list of simple peaks if allPeaksArray is not None and allPeaksArray.size != 0: fitMethod = self.fitMethod singularMode = self.singularMode try: result = () if fitMethod == PARABOLICMETHOD: # and singularMode is True: # parabolic - generate all peaks in one operation result = CPeak.fitParabolicPeaks(data, regionArray, allPeaksArray) else: method = 0 if fitMethod == GAUSSIANMETHOD else 1 # currently gaussian or lorentzian if singularMode is True: _errorMsgs = [] # fit peaks individually in small regions for peakArray, localRegionArray in zip(allPeaksArray, allRegionArrays): peakArray = peakArray.reshape((1, self.dimensionCount)) peakArray = peakArray.astype(np.float32) try: localResult = CPeak.fitPeaks(data, localRegionArray, peakArray, method) result += tuple(localResult) except Exception as es: # catch all errors as a single report _errorMsgs.append(f'failed to fit peak: {peakArray}\n{es}') if _errorMsgs: getLogger().warning('\n'.join(_errorMsgs)) else: # fit all peaks in one operation result = CPeak.fitPeaks(data, regionArray, allPeaksArray, method) func = self.findFunc return func(result) # return [SimplePeak(points=point[::-1], height=height, lineWidths=pointLineWidths) # for height, point, pointLineWidths in result] except CPeak.error as es: getLogger().warning(f'Aborting peak fit: {es}') return [] return []
def _findPeaks(self, data, posThreshold, negThreshold): """find the peaks in data (type numpy-array) and return as a list of SimplePeak instances """ # NOTE:ED - need to validate the parameters first if self.noise is None: getLogger().debug('spectrum.estimateNoise on findPeaks') self.noise = self.spectrum.estimateNoise() # set threshold values doPos = posThreshold is not None doNeg = negThreshold is not None posLevel = posThreshold or 0.0 negLevel = negThreshold or 0.0 # accounted for by pickPeaks in superclass exclusionBuffer = [0] * self.dimensionCount excludedRegionsList = [] excludedDiagonalDimsList = [] excludedDiagonalTransformList = [] nonAdj = 1 if self.checkAllAdjacent else 0 minLinewidth = [0.0] * self.dimensionCount if not self.minimumLineWidth else self.minimumLineWidth pointPeaks = CPeak.findPeaks(data, doNeg, doPos, negLevel, posLevel, exclusionBuffer, nonAdj, self.dropFactor, minLinewidth, excludedRegionsList, excludedDiagonalDimsList, excludedDiagonalTransformList) # the above can be replaced with the peak list for # get the peak maxima from pointPeaks pointPeaks = [(np.array(position), height) for position, height in pointPeaks] # ignore exclusion buffer for the minute validPointPeaks = [pk for pk in pointPeaks] allPeaksArray = None allRegionArrays = [] regionArray = None # get the offset of the bottom left of the slice region startPoint = np.array([pp[0] for pp in self.sliceTuples]) endPoint = np.array([pp[1] for pp in self.sliceTuples]) numPointInt = (endPoint - startPoint) + 1 for position, height in validPointPeaks: # get the region containing this point bLeft = np.maximum(position - self._hbsWidth, 0) tRight = np.minimum(position + self._hbsWidth + 1, numPointInt) localRegionArray = np.array((bLeft, tRight), dtype=np.int32) # get the larger regionArray size containing all points so far # the actual picked region may be huge, only need the bounds containing the maxima bLeftAll = np.maximum(position - self._hbsWidth - 1, 0) tRightAll = np.minimum(position + self._hbsWidth + 1, numPointInt) if regionArray is not None: bLeftAll = np.array(np.minimum(bLeftAll, regionArray[0]), dtype=np.int32) tRightAll = np.array(np.maximum(tRightAll, regionArray[1]), dtype=np.int32) # numpy arrays need tweaking to pass to the c code peakArray = position.reshape((1, self.dimensionCount)) peakArray = peakArray.astype(np.float32) regionArray = np.array((bLeftAll, tRightAll), dtype=np.int32) if allPeaksArray is None: allPeaksArray = peakArray else: allPeaksArray = np.append(allPeaksArray, peakArray, axis=0) allRegionArrays.append(localRegionArray) return allPeaksArray, allRegionArrays, regionArray, validPointPeaks
[docs] def pickPeaks(self, sliceTuples, peakList, positiveThreshold=None, negativeThreshold=None) -> list: """Set the default functionality for picking simplePeaks from the region defined by axisDict """ # set the correct parameters for the standard findPeaks self._hbsWidth = self.halfBoxFindPeaksWidth self.findFunc = self._returnSimplePeaks return super().pickPeaks(sliceTuples=sliceTuples, peakList=peakList, positiveThreshold=positiveThreshold, negativeThreshold=negativeThreshold)
def _returnSimplePeaks(self, foundPeaks): """Return a list of SimplePeak objects from the height/point/lineWidth foundPeaks list """ peaks = [SimplePeak(points=point[::-1], height=height, lineWidths=pointLineWidths if self.setLineWidths else None) for height, point, pointLineWidths in foundPeaks] return peaks
[docs] def snapToExtremum(self, peak) -> list: """ :param axisDict: Axis limits are passed in as a dict of (axisCode, tupleLimit) key, value pairs with the tupleLimit supplied as (start,stop) axis limits in ppm (lower ppm value first). :param peakList: peakList instance to add newly pickedPeaks :return: list of core.Peak instances """ if self.spectrum is None: raise RuntimeError('%s.spectrum is None' % self.__class__.__name__) if not self.spectrum.hasValidPath(): raise RuntimeError('%s.pickPeaks: spectrum %s, No valid spectral datasource defined' % (self.__class__.__name__, self.spectrum)) # set up the search box widths self._hbsWidth = self.halfBoxSearchWidth self._hbfWidth = self.halfBoxFitWidth pointPositions = peak.pointPositions spectrum = peak.spectrum valuesPerPoint = spectrum.ppmPerPoints axisCodes = spectrum.axisCodes # searchBox for Nd if self.searchBoxMode: boxWidths = [] axisCodes = self.spectrum.axisCodes valuesPerPoint = self.spectrum.ppmPerPoints for axisCode, valuePerPoint in zip(axisCodes, valuesPerPoint): letterAxisCode = (axisCode[0] if axisCode != 'intensity' else axisCode) if axisCode else None if letterAxisCode in self.searchBoxWidths: newWidth = math.floor(self.searchBoxWidths[letterAxisCode] / (2.0 * valuePerPoint)) boxWidths.append(max(1, newWidth)) else: # default to the given parameter value boxWidths.append(max(1, self._hbsWidth or 1)) else: boxWidths = [] pointCounts = spectrum.pointCounts peakBoxWidths = peak.boxWidths for axisCode, pointCount, peakBWidth, valuePPoint in zip(axisCodes, pointCounts, peakBoxWidths, valuesPerPoint): _halfBoxWidth = pointCount / 100 # copied from V2 boxWidths.append(max(_halfBoxWidth, 1, int((peakBWidth or 1) / 2))) # add the new boxWidths array as np.int32 type boxWidths = np.array(boxWidths, dtype=np.int32) pLower = np.floor(pointPositions).astype(np.int32) # find the co-ordinates of the lower corner of the data region startPoint = np.maximum(pLower - boxWidths, 0) self.sliceTuples = [(int(pos - bWidth), int(pos + bWidth + 1)) for pos, bWidth in zip(pointPositions, boxWidths)] data = self.spectrum.dataSource.getRegionData(self.sliceTuples, aliasingFlags=[1] * self.spectrum.dimensionCount) # getLogger().debug('%s.snapToExtremum: found %d peaks in spectrum %s; %r' % # (self.__class__.__name__, len(peaks), self.spectrum, axisDict)) # get the height of the current peak (to stop peak flipping) height = spectrum.getPointValue(peak.pointPositions) scaledHeight = 0.5 * height # this is so that have sensible pos/negLevel if height > 0: posLevel = scaledHeight negLevel = None else: posLevel = None negLevel = scaledHeight # find the list of peaks in the region allPeaksArray, allRegionArrays, regionArray, validPoints = self._findPeaks(data, posLevel, negLevel) # if peaks exist then create a list of simple peaks if allPeaksArray is not None and allPeaksArray.size != 0: # find the closest peak in the found list bestHeight = peakPoint = None bestFit = 0.0 validPoints = sorted(validPoints, key=lambda val: abs(val[1])) for ii, findNextPeak in enumerate(validPoints): # find the highest peak to start from peakHeight = findNextPeak[1] peakDist = np.linalg.norm((np.array(findNextPeak[0]) - boxWidths) / boxWidths) peakFit = abs(height) / (1e-6 + peakDist) if height == None or peakFit > bestFit: bestFit = peakFit bestHeight = abs(peakHeight) peakPoint = findNextPeak[0] # use this as the centre for the peak fitting peakPoint = np.array(peakPoint) peakArray = peakPoint.reshape((1, spectrum.dimensionCount)) peakArray = peakArray.astype(np.float32) try: if self.searchBoxDoFit: if self.fitMethod == PARABOLICMETHOD: # parabolic - generate all peaks in one operation result = CPeak.fitParabolicPeaks(data, regionArray, peakArray) else: method = 0 if self.fitMethod == GAUSSIANMETHOD else 1 # use the halfBoxFitWidth to give a close fit firstArray = np.maximum(peakArray[0] - self._hbfWidth, regionArray[0]) lastArray = np.minimum(peakArray[0] + self._hbfWidth + 1, regionArray[1]) regionArray = np.array((firstArray, lastArray), dtype=np.int32) # fit the single peak result = CPeak.fitPeaks(data, regionArray, peakArray, method) else: # take the maxima result = ((bestHeight, peakPoint, None),) # if any results are found then set the new peak position/height if result: height, center, linewidth = result[0] _shape = data.shape[::-1] newPos = list(peak.pointPositions) for ii in range(len(center)): if 0.5 < center[ii] < (_shape[ii] - 0.5): newPos[ii] = float(center[ii] + startPoint[ii]) peak.pointPositions = newPos if linewidth: peak.pointLineWidths = linewidth if self.searchBoxDoFit: peak.height = height else: # get the interpolated height peak.height = spectrum.getPointValue(peak.pointPositions) except CPeak.error as es: getLogger().warning(f'Aborting peak fit: {es}') return [] return []
[docs] def fitExistingPeaks(self, peaks: Sequence['Peak']): """Refit the current selected peaks. Must be called with peaks that belong to this peakList """ if not peaks: return # set the correct parameters for the standard findPeaks self._hbsWidth = self.halfBoxFindPeaksWidth allPeaksArray = None allRegionArrays = [] regionArray = None numLists = Counter([pk.peakList for pk in peaks]) if len(numLists) > 1: raise ValueError('List contains peaks from more than one peakList.') # pointPositions = peaks[0].pointPositions spectrum = peaks[0].spectrum pointCounts = spectrum.pointCounts numDim = spectrum.dimensionCount for peak in peaks: pointPositions = peak.pointPositions # round up/down the position pLower = np.floor(pointPositions).astype(np.int32) pUpper = np.ceil(pointPositions).astype(np.int32) position = np.round(np.array(pointPositions)) # generate a np array with the number of points per dimension numPoints = np.array(pointCounts) # consider for each dimension on the interval [point-3,point+4>, account for min and max # of each dimension if self.fitMethod == PARABOLICMETHOD or self.singularMode is True: firstArray = np.maximum(pLower - self._hbsWidth, 0) lastArray = np.minimum(pUpper + self._hbsWidth, numPoints) else: # extra plane in each direction increases accuracy of group fitting firstArray = np.maximum(pLower - self._hbsWidth - 1, 0) lastArray = np.minimum(pUpper + self._hbsWidth + 1, numPoints) # Cast to int for subsequent call firstArray = firstArray.astype(np.int32) lastArray = lastArray.astype(np.int32) localRegionArray = np.array((firstArray, lastArray), dtype=np.int32) if regionArray is not None: firstArray = np.minimum(firstArray, regionArray[0]) lastArray = np.maximum(lastArray, regionArray[1]) # requires reshaping of the array for use with CPeak.fitParabolicPeaks peakArray = position.reshape((1, numDim)) peakArray = peakArray.astype(np.float32) regionArray = np.array((firstArray, lastArray), dtype=np.int32) if allPeaksArray is None: allPeaksArray = peakArray else: allPeaksArray = np.append(allPeaksArray, peakArray, axis=0) allRegionArrays.append(localRegionArray) if allPeaksArray is not None and allPeaksArray.size != 0: # map to (0, 0) regionArray = np.array((firstArray - firstArray, lastArray - firstArray)) self.sliceTuples = [(fst, lst) for fst, lst in zip(firstArray, lastArray)] data = self.spectrum.dataSource.getRegionData(self.sliceTuples, aliasingFlags=[1] * self.spectrum.dimensionCount) # update positions relative to the corner of the data array firstArray = firstArray.astype(np.float32) updatePeaksArray = None for pk in allPeaksArray: if updatePeaksArray is None: updatePeaksArray = pk - firstArray updatePeaksArray = updatePeaksArray.reshape((1, numDim)) updatePeaksArray = updatePeaksArray.astype(np.float32) else: pk = pk - firstArray pk = pk.reshape((1, numDim)) pk = pk.astype(np.float32) updatePeaksArray = np.append(updatePeaksArray, pk, axis=0) try: result = () # NOTE:ED - groupMode must be gaussian if self.fitMethod == PARABOLICMETHOD and self.singularMode is True: # parabolic - generate all peaks in one operation result = CPeak.fitParabolicPeaks(data, regionArray, updatePeaksArray) else: method = 0 if self.fitMethod == GAUSSIANMETHOD else 1 # currently gaussian or lorentzian if self.singularMode is True: # fit peaks individually for peakArray, localRegionArray in zip(allPeaksArray, allRegionArrays): peakArray = peakArray - firstArray peakArray = peakArray.reshape((1, numDim)) peakArray = peakArray.astype(np.float32) localRegionArray = np.array((localRegionArray[0] - firstArray, localRegionArray[1] - firstArray), dtype=np.int32) localResult = CPeak.fitPeaks(data, localRegionArray, peakArray, method) result += tuple(localResult) else: # fit all peaks in one operation result = CPeak.fitPeaks(data, regionArray, updatePeaksArray, method) except CPeak.error as e: # there could be some fitting error getLogger().warning("Aborting peak fit, Error for peak: %s:\n\n%s " % (peak, e)) return for pkNum, peak in enumerate(peaks): height, center, linewidth = result[pkNum] _shape = data.shape[::-1] newPos = list(peak.pointPositions) for ii in range(len(center)): if 0.5 < center[ii] < (_shape[ii] - 0.5): newPos[ii] = center[ii] + firstArray[ii] peak.pointPositions = newPos peak.pointLineWidths = linewidth peak.height = height
# NOTE:ED - example for plotting in pycharm - keep for the minute # # 20191004:ED testing - plot the dataArray during debugging # import np as np # from mpl_toolkits import mplot3d # import matplotlib.pyplot as plt # # fig = plt.figure(figsize=(10, 8), dpi=100) # ax = fig.gca(projection='3d') # # shape = dataArray.shape # rr = (np.max(dataArray) - np.min(dataArray)) * 100 # # from ccpn.ui.gui.lib.GuiSpectrumViewNd import _getLevels # posLevels = _getLevels(spectrum.positiveContourCount, spectrum.positiveContourBase, # spectrum.positiveContourFactor) # posLevels = np.array(posLevels) # # dims = [] # for ii in shape: # dims.append(np.linspace(0, ii-1, ii)) # # for ii in range(shape[0]): # try: # ax.contour(dims[2], dims[1], dataArray[ii] / rr, posLevels / rr, offset=(shape[0]-ii-1), cmap=plt.cm.viridis) # except Exception as es: # pass # trap stupid plot error # # ax.legend() # ax.set_xlim3d(-0.1, shape[2]-0.9) # ax.set_ylim3d(-0.1, shape[1]-0.9) # ax.set_zlim3d(-0.1, shape[0]-0.9) # # plt.show() is at the bottom of function # find new peaks # self.fitExistingPeaks(peaks, fitMethod=fitMethod, singularMode=True) # # 20191004:ED testing - plotting scatterplot of data # else: # # # result = CPeak.fitPeaks(dataArray, regionArray, allPeaksArray, method) # result = CPeak.fitParabolicPeaks(dataArray, regionArray, allPeaksArray) # # for height, centerGuess, linewidth in result: # # # clip the point to the exclusion area, to stop rogue peaks # # center = np.array(centerGuess).clip(min=position - npExclusionBuffer, # # max=position + npExclusionBuffer) # center = np.array(centerGuess) # # # outofPlaneMinTest = np.array([]) # # outofPlaneMaxTest = np.array([]) # # for ii in range(numDim): # # outofPlaneMinTest = np.append(outofPlaneMinTest, 0.0) # # outofPlaneMaxTest = np.append(outofPlaneMaxTest, dataArray.shape[numDim-ii-1]-1.0) # # # # # check whether the new peak is outside of the current plane # # outofPlaneCenter = np.array(centerGuess).clip(min=position - np.array(outofPlaneMinTest), # # max=position + np.array(outofPlaneMaxTest)) # # # # print(">>>", center, outofPlaneCenter, not np.array_equal(center, outofPlaneCenter)) # # # ax.scatter(*center, c='r', marker='^') # # # # x2, y2, _ = mplot3d.proj3d.proj_transform(1, 1, 1, ax.get_proj()) # # # # ax.text(*center, str(center), fontsize=12) # # # except Exception as es: # # print('>>>error:', str(es)) # # dimCount = len(startPoints) # # height = float(dataArray[tuple(position[::-1])]) # # # have to reverse position because dataArray backwards # # # have to float because API does not like np.float32 # # center = position # # linewidth = dimCount * [None] # # position = center + startPointBufferActual # # peak = self._newPickedPeak(pointPositions=position, height=height, # lineWidths=linewidth, fitMethod=fitMethod) # peaks.append(peak) # # plt.show() PeakPickerNd._registerPeakPicker()