Source code for ccpn.core.lib.PeakPickers.PeakPickerABC

"""
PeakPicker abstract base class
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (http://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license",
               )
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y"
                )
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2022-01-21 11:22:07 +0000 (Fri, January 21, 2022) $"
__version__ = "$Revision: 3.0.4 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: geertenv $"
__date__ = "$Date: 2021-01-13 10:28:41 +0000 (Wed, Jan 13, 2021) $"
#=========================================================================================
# Start of code
#=========================================================================================

from json import loads
from collections import OrderedDict
import numpy as np
from ccpn.util.traits.CcpNmrJson import CcpNmrJson
from ccpn.util.traits.CcpNmrTraits import CFloat, CInt, CBool, CString
from ccpn.util.Logging import getLogger
from ccpn.util.Common import loadModules
from ccpn.framework.PathsAndUrls import peakPickerPath
from collections import defaultdict

PEAKPICKERPARAMETERS = '_peakPickerParameters'


#=========================================================================================
# Available peakPicker methods
#=========================================================================================

[docs]def getPeakPickerTypes() -> OrderedDict: """Get peakPicker types :return: a dictionary of (type-identifier-strings, PeakPicker classes) as (key, value) pairs """ # from ccpn.core.lib.PeakPickers.PeakPickerNd import PeakPickerNd # from ccpn.core.lib.PeakPickers.Simple1DPeakPicker import Simple1DPeakPicker # from ccpn.core.lib.PeakPickers.NmrgluePeakPicker import NmrgluePeakPicker if not PeakPickerABC._loadedPeakPickers: # load all from folder loadModules([peakPickerPath, ]) PeakPickerABC._loadedPeakPickers = True return PeakPickerABC._peakPickers
[docs]def isRegistered(peakPickerType): """Return True if a PeakPicker class of type peakPickerType is registered :param peakPickerType: type str; reference to peakPickerType of peakPicker class :return: True if class referenced by peakPickerType has been registered else False """ return getPeakPickerTypes().get(peakPickerType) is not None
[docs]class SimplePeak(object): """A simple class to hold peak data """ currentIndx = 0 def __init__(self, points, height, lineWidths=None, volume=None, clusterId=None): """ :param points: list/tuple of points (0-based); z,y,x ordered in case of nD (i.e. numpy ordering) :param height: height of the peak :param lineWidths: list/tuple with lineWidths of the peak for each dimension (in points), optional, None if not defined :param volume: volume of the peak; optional, None if not defined :param clusterId: id of the peak cluster (i.e. a group of peaks in close proximity); optional, None if not defined """ self.indx = SimplePeak.currentIndx SimplePeak.currentIndx += 1 self.points = tuple(points) self.height = height self.lineWidths = lineWidths self.volume = volume self.clusterId = clusterId def __str__(self): return '<SimplePeak %s: %r, height=%.1e>' % (self.indx, self.points, self.height)
#========================================================================================= # Start of class #=========================================================================================
[docs]class PeakPickerABC(CcpNmrJson): """ABC for implementation of a peak picker """ classVersion = 1.0 # For json saving #========================================================================================= # to be subclassed #========================================================================================= peakPickerType = None # A unique string identifying the peak picker defaultPointExtension = 1 # points to extend the region to pick on either side onlyFor1D = False #========================================================================================= # data formats #========================================================================================= # A dict of registered dataFormat: filled by _registerPeakPicker classmethod, called once after # each definition of a new derived class _peakPickers = OrderedDict() _loadedPeakPickers = False @classmethod def _registerPeakPicker(cls): """register cls.peakPickerType""" if cls.peakPickerType in cls._peakPickers: getLogger().debug(f'PeakPicker "{cls.peakPickerType}" was already registered') else: PeakPickerABC._peakPickers[cls.peakPickerType] = cls # register for restoring from json super(PeakPickerABC, cls).register() getLogger().info(f'Registering peakPicker class {cls.peakPickerType}') #========================================================================================= # parameter definitions and mappings onto the Spectrum class #========================================================================================= keysInOrder = True # maintain the definition order saveAllTraitsToJson = True classVersion = 1.0 # for json saving # list of core peakPicker attributes that need to be restored when the spectrum is loaded dimensionCount = CInt(default_value=0) pointExtension = CInt(default_value=0) autoFit = CBool(default_value=False) dropFactor = CFloat(default_value=0.1) fitMethod = CString(allow_none=True, default_value=None) positiveThreshold = CFloat(allow_none=True, default_value=0.0) negativeThreshold = CFloat(allow_none=True, default_value=0.0) #========================================================================================= # start of methods #========================================================================================= def __init__(self, spectrum, autoFit=False): """Initialise the instance and associate with spectrum :param spectrum: associate instance with spectrum and import spectrum's parameters :param autoFit: True/False, automatically fit peaks - functionality defined by subclassed peak-pickers """ from ccpn.core.Spectrum import Spectrum if self.peakPickerType is None: raise RuntimeError('%s: peakPickerType is undefined' % self.__class__.__name__) if spectrum is None: raise ValueError('%s: spectrum is None' % self.__class__.__name__) if not isinstance(spectrum, Spectrum): raise ValueError('%s: spectrum is not of Spectrum class' % self.__class__.__name__) if spectrum.dimensionCount > 1 and self.onlyFor1D: raise ValueError('%s only works for 1D spectra' % self.__class__.__name__) super().__init__() # default parameters for all peak pickers self.setDefaultParameters() # initialise from parameters self.spectrum = spectrum self.dimensionCount = spectrum.dimensionCount self.pointExtension = self.defaultPointExtension self.autoFit = autoFit # attributes not required to be persistent between load/save self.lastPickedPeaks = None self.sliceTuples = None self._excludePpmRegions = defaultdict(list) # {axisCode:[[start,stop],...]]} regions to be excluded when picking, e.g.: solvents # attribute needed for 1D when manually picking within a SpectrumDisplay box self._intensityLimits = (np.inf, -np.inf)
[docs] def setDefaultParameters(self): """Set default values for all parameters """ for par in self.keys(): self.setTraitDefaultValue(par)
def _setParameters(self, **parameters): """Set parameters for peakPicker instance """ for par, value in parameters.items(): if par in self.keys(): self.setTraitValue(par, value)
[docs] def setParameters(self, **parameters): """Set parameters as attributes of self Example calling function: :: >>> peakPicker.setParameters(**parameters) >>> peakPicker.setParameters(fitMethod='gaussian', dropFactor=0.1) The contents of parameters to be defined by the peakPicker class. In the above example, 'fitMethod' and 'dropFactor' are defined in the baseClass, but their properties and types are to be defined by the subclass. :param parameters: dict of key, value pairs """ self._setParameters(**parameters) self._storeAttributes()
def _checkParameters(self): """Check whether the parameters are the correct types """ # This can check the common parameters, subclassing can check local # MUST BE SUBCLASSED raise NotImplementedError("Code error: function not implemented") def _storeAttributes(self): """Store peakPicker attributes that need restoring when a project is reloaded User attributes are listed in self.attributes at the head of the peakPicker class """ if self.spectrum is None: raise RuntimeError('%s._storeAttributes: spectrum not defined' % self.__class__.__name__) jsonData = self.toJson() self.spectrum._setInternalParameter(PEAKPICKERPARAMETERS, jsonData) def _restoreAttributes(self): """Restore important peakPicker attributes when a project is reloaded User attributes are listed in self.attributes at the head of the peakPicker class """ if self.spectrum is None: raise RuntimeError('%s._restoreAttributes: spectrum not defined' % self.__class__.__name__) jsonData = self.spectrum._getInternalParameter(PEAKPICKERPARAMETERS) if jsonData is None or len(jsonData) == 0: raise RuntimeError('%s._restoreAttributes: json data appear to be corrupted' % self.__class__.__name__) self.fromJson(jsonData) def _detachFromSpectrum(self): """Remove all peakPicker settings from the spectrum """ if self.spectrum is None: raise RuntimeError('%s._detachFromSpectrum: spectrum not defined' % self.__class__.__name__) # remove all links to spectrum self.spectrum._setInternalParameter(PEAKPICKERPARAMETERS, None) self.spectrum = None #=========================================================================================
[docs] def findPeaks(self, data) -> list: """find the peaks in data (type numpy-array) and return as a list of SimplePeak instances note that SimplePeak.points are ordered z,y,x for nD, in accordance with the numpy nD data array called from the pickPeaks() method any required parameters that findPeaks method needs should be initialised/set before using the setParameters() method; i.e.: myPeakPicker = PeakPicker(spectrum=mySpectrum) myPeakPicker.setParameters(dropFactor=0.2, positiveThreshold=1e6, negativeThreshold=None) corePeaks = myPeakPicker.pickPeaks(axisDict={'H':(6.0,11.5),'N':(102.3,130.0)}, spectrum.peaklists[-1]) :param data: nD numpy array :return list of SimplePeak instances To be subclassed """ raise NotImplementedError('%s.findPeaks should be implemented' % self.__class__.__name__)
[docs] def pickPeaks(self, sliceTuples, peakList, positiveThreshold=None, negativeThreshold=None) -> list: """Pick peaks in spectral region defined by sliceTuples=[(start_1,stop_1), (start_2,stop_2), ...], sliceTuples are 1-based; sliceTuple stop values are inclusive (i.e. different from the python slice object) :param sliceTuples: list of (start,stop) point values per dimension (1-based) :param peakList: peakList instance to add newly pickedPeaks :return: list of core.Peak instances """ if self.spectrum is None: raise RuntimeError('%s.spectrum is None' % self.__class__.__name__) if not self.spectrum.hasValidPath(): raise RuntimeError('%s.pickPeaks: spectrum %s, No valid spectral datasource defined' % (self.__class__.__name__, self.spectrum)) # store the threshold values self.positiveThreshold = positiveThreshold self.negativeThreshold = negativeThreshold self.sliceTuples = sliceTuples if self.defaultPointExtension: # add default points to extend pick region self.sliceTuples = [(sLeft - self.defaultPointExtension, sRight + self.defaultPointExtension) if sLeft <= sRight else (sLeft + self.defaultPointExtension, sRight - self.defaultPointExtension) for sLeft, sRight in self.sliceTuples] # TODO: use Spectrum aliasing definitions once defined data = self.spectrum.dataSource.getRegionData(self.sliceTuples, aliasingFlags=[1] * self.spectrum.dimensionCount) peaks = self.findPeaks(data) getLogger().debug('%s.pickPeaks: found %d peaks in spectrum %s; sliceTuples = %r' % (self.__class__.__name__, len(peaks), self.spectrum, self.sliceTuples)) corePeaks = [] if len(peaks) > 0: self.lastPickedPeaks = peaks corePeaks = self._createCorePeaks(peaks, peakList) self._storeAttributes() return corePeaks
def _createCorePeaks(self, peaks, peakList) -> list: """ Create core.Peak instances :param peaks: a list with simplePeaks :param peakList: a core.PeakList instance :return: a list with core.Peak instances """ from ccpn.core.lib.peakUtils import peakParabolicInterpolation corePeaks = [] for pk in peaks: if len(pk.points) != self.dimensionCount: raise RuntimeError('%s: invalid dimensionality of points attribute' % pk) # correct the peak.points for "offset" (the slice-positions taken) and ordering (i.e. inverse) pointPositions = [float(p) + float(self.sliceTuples[idx][0]) for idx, p in enumerate(pk.points[::-1])] # check whether a peak already exists at pointPositions in the peakList if self._validatePointPeak(pointPositions, peakList): if pk.height is None: # height was not defined; get the interpolated value from the data pk.height = self.spectrum.dataSource.getPointValue(pointPositions) if (self.positiveThreshold and pk.height > self.positiveThreshold) or \ (self.negativeThreshold and pk.height < self.negativeThreshold): cPeak = peakList.newPeak(pointPositions=pointPositions, height=pk.height, volume=pk.volume, pointLineWidths=pk.lineWidths) if self.autoFit: peakParabolicInterpolation(cPeak, update=True) corePeaks.append(cPeak) return corePeaks def _validatePointPeak(self, pointPositions, peakList): """ Check whether a peak already exists at these pointPositions in the supplied peakList :param pointPositions: position in points of the position to test :param peakList: core.PeakList instance :return: True if pointPositions is valid, i.e. position is available """ intPositions = [int((pos - 1) % pCount) + 1 for pos, pCount in zip(pointPositions, self.spectrum.pointCounts)] # API position starts at 1 existingPositions = [[int(pp) for pp in pk.pointPositions] for pk in peakList.peaks] return intPositions not in existingPositions def __str__(self): return '<%s for %r>' % (self.__class__.__name__, self.spectrum.name)
#end class from ccpn.util.traits.CcpNmrTraits import Instance from ccpn.util.traits.TraitJsonHandlerBase import CcpNmrJsonClassHandlerABC
[docs]class PeakPickerTrait(Instance): """Specific trait for a PeakPicker instance. """ klass = PeakPickerABC def __init__(self, **kwds): Instance.__init__(self, klass=self.klass, allow_none=True, **kwds)
[docs] class jsonHandler(CcpNmrJsonClassHandlerABC): # klass = PeakPickerABC pass