# Start of code

import numpy as np
from typing import Sequence, Optional

from ccpn.core.lib.AxisCodeLib import getAxisCodeMatchIndices, _axisCodeMapIndices

from ccpn.util import Common as commonUtil
from ccpn.core.Spectrum import Spectrum
from ccpnmodel.ccpncore.api.ccp.nmr.Nmr import PeakList as ApiPeakList
# from ccpnmodel.ccpncore.lib._ccp.nmr.Nmr.PeakList import pickNewPeaks
from ccpn.util.decorators import logCommand
from ccpn.core.lib.ContextManagers import newObject, undoBlockWithoutSideBar
from ccpn.util.Logging import getLogger
from ccpn.core._implementation.PMIListABC import PMIListABC


[docs]class PeakList(PMIListABC): """An object containing Peaks. Note: the object is not a (subtype of a) Python list. To access all Peak objects, use PeakList.peaks.""" #: Short class name, for PID. shortClassName = 'PL' # Attribute it necessary as subclasses must use superclass className className = 'PeakList' _parentClass = Spectrum #: Name of plural link to instances of class _pluralLinkName = 'peakLists' #: List of child classes. _childClasses = [] # Qualified name of matching API class _apiClassQualifiedName = ApiPeakList._metaclass.qualifiedName() #========================================================================================= # CCPN properties #========================================================================================= @property def _apiPeakList(self) -> ApiPeakList: """API peakLists matching PeakList.""" return self._wrappedData def _setPrimaryChildClass(self): """Set the primary classType for the child list attached to this container """ from ccpn.core.Peak import Peak as klass if not klass in self._childClasses: raise TypeError('PrimaryChildClass %s does not exist as child of %s' % (klass.className, self.className)) self._primaryChildClass = klass @property def chemicalShiftList(self): """STUB: hot-fixed later""" return None #========================================================================================= # Implementation functions #========================================================================================= @classmethod def _getAllWrappedData(cls, parent: Spectrum) -> list: """get wrappedData (PeakLists) for all PeakList children of parent Spectrum""" return [x for x in parent._wrappedData.sortedPeakLists() if x.dataType == 'Peak'] def _finaliseAction(self, action: str): """Subclassed to notify changes to associated peakListViews """ if not super()._finaliseAction(action): return # this is a can-of-worms for undelete at the minute try: if action in ['change']: for plv in self.peakListViews: plv._finaliseAction(action) except Exception as es: raise RuntimeError('Error _finalising peakListViews: %s' % str(es))
[docs] def delete(self): """Delete peakList """ # call the delete method from the parent class self._parent._deletePeakList(self)
#========================================================================================= # CCPN functions #=========================================================================================
[docs] def pickPeaksNd(self, regionToPick: Sequence[float] = None, doPos: bool = True, doNeg: bool = True, fitMethod: str = GAUSSIANMETHOD, excludedRegions=None, excludedDiagonalDims=None, excludedDiagonalTransform=None, minDropFactor: float = 0.1): getLogger().warning('Deprecated method. Use spectrum.pickPeaks instead') from ccpn.core.lib.PeakListLib import _pickPeaksNd return _pickPeaksNd(self, regionToPick=regionToPick, doPos=doPos, doNeg=doNeg, fitMethod=fitMethod, excludedRegions=excludedRegions, excludedDiagonalDims=excludedDiagonalDims, excludedDiagonalTransform=excludedDiagonalTransform, minDropFactor=minDropFactor)
[docs] @logCommand(get='self') def estimateVolumes(self, volumeIntegralLimit=2.0): """Estimate the volumes for the peaks in this peakList The width of the volume integral in each dimension is the lineWidth * volumeIntegralLimit, the default is 2.0 * FWHM of the peak. :param volumeIntegralLimit: integral width as a multiple of lineWidth (FWHM) """ with undoBlockWithoutSideBar(): for pp in self.peaks: # estimate the volume for each peak height = pp.height lineWidths = pp.lineWidths if lineWidths and None not in lineWidths and height: pp.estimateVolume(volumeIntegralLimit=volumeIntegralLimit) else: getLogger().warning('Peak %s contains undefined height/lineWidths' % str(pp))
[docs] @logCommand(get='self') def copyTo(self, targetSpectrum: Spectrum, targetPeakList=None, includeAllPeakProperties=True, **kwargs) -> 'PeakList': """ Copy the origin PeakList peaks to a targetSpectrum. If targetPeakList is given, peaks will be added to it, otherwise a new PeakList is created (default behaviour). return the target PeakList with the newly copied peaks. :param targetSpectrum: object: Core.Spectrum or Str: Pid :param targetPeakList: object: Core.PeakList or Str: Pid :param kwargs: any extra PeakList attributes for newly created peakLists. Not used if it is given a targetPeakList """ singleValueTags = ['isSimulated', 'symbolColour', 'symbolStyle', 'textColour', 'textColour', 'title', 'comment', 'meritThreshold', 'meritEnabled', 'meritColour'] targetSpectrum = self.project.getByPid(targetSpectrum) if isinstance(targetSpectrum, str) else targetSpectrum if not targetSpectrum: raise TypeError('targetSpectrum not defined') if not isinstance(targetSpectrum, Spectrum): raise TypeError('targetSpectrum is not of type Spectrum') # checking targetSpectrum for compatibility # TODO enable copying across different dimensionalities dimensionCount = self.spectrum.dimensionCount if dimensionCount < targetSpectrum.dimensionCount: raise ValueError("Cannot copy %sD %s to %sD %s" % (dimensionCount, self.longPid, targetSpectrum.dimensionCount, targetSpectrum.longPid)) dimensionMapping = _axisCodeMapIndices(self.spectrum.axisCodes,targetSpectrum.axisCodes) if None in dimensionMapping: raise ValueError("%s axisCodes %r not compatible with targetSpectrum axisCodes %r" % (self, self.spectrum.axisCodes, targetSpectrum.axisCodes)) if targetPeakList: targetPeakList = self.project.getByPid(targetPeakList) if isinstance(targetPeakList,str) else targetPeakList if not isinstance(targetPeakList, PeakList): raise TypeError('targetPeakList is not of type PeakList') if not targetPeakList in targetSpectrum.peakLists: raise TypeError('targetPeakList is not a PeakList of: %s' else: # make a dictionary with parameters of self to be copied to new targetPeakList (if created) params = dict(((tag, getattr(self, tag)) for tag in singleValueTags)) params['comment'] = "Copy of %s\n" % self.longPid + (params['comment'] or '') for key, val in kwargs.items(): if key in singleValueTags: params[key] = val else: raise ValueError("PeakList has no attribute %s" % key) with undoBlockWithoutSideBar(): if not targetPeakList: targetPeakList = targetSpectrum.newPeakList(**params) for peak in self.peaks: peak.copyTo(targetPeakList, includeAllProperties=includeAllPeakProperties) return targetPeakList
[docs] @logCommand(get='self') def subtractPeakLists(self, peakList: 'PeakList') -> 'PeakList': """ Subtracts peaks in peakList2 from peaks in peakList1, based on position, and puts those in a new peakList3. Assumes a common spectrum for now. """ def _havePeakNearPosition(values, tolerances, peaks) -> Optional['Peak']: for peak in peaks: for i, position in enumerate(peak.position): if abs(position - values[i]) > tolerances[i]: break else: return peak peakList = self.project.getByPid(peakList) if isinstance(peakList, str) else peakList if not peakList: raise TypeError('peakList not defined') if not isinstance(peakList, PeakList): raise TypeError('peakList is not of type PeakList') # with logCommandBlock(prefix='newPeakList=', get='self') as log: # peakStr = '[' + ','.join(["'%s'" % for peak in peakList2]) + ']' # log('subtractPeakLists', peaks=peakStr) with undoBlockWithoutSideBar(): spectrum = self.spectrum assert spectrum is peakList.spectrum, 'For now requires both peak lists to be in same spectrum' # dataDims = spectrum.sortedDataDims() tolerances = self.spectrum.assignmentTolerances peaks2 = peakList.peaks peakList3 = spectrum.newPeakList() for peak1 in self.peaks: values1 = [peak1.position[dim] for dim in range(len(peak1.position))] if not _havePeakNearPosition(values1, tolerances, peaks2): peakList3.newPeak(height=peak1.height, volume=peak1.volume, figureOfMerit=peak1.figureOfMerit, annotation=peak1.annotation, ppmPositions=peak1.position, pointPositions=peak1.pointPositions) return peakList3
[docs] @logCommand(get='self') def restrictedPick(self, positionCodeDict, doPos, doNeg): codes = list(positionCodeDict.keys()) positions = [positionCodeDict[code] for code in codes] # match the spectrum to the restricted codes, these are the only ones to update indices = getAxisCodeMatchIndices(self.spectrum.axisCodes, codes) # divide by 2 to get the double-width tolerance, i.e. the width of the region - CHECK WITH GEERTEN tolerances = tuple(tol / 2 for tol in self.spectrum.assignmentTolerances) limits = [sorted(lims) for lims in self.spectrum.spectrumLimits] selectedRegion = [] minDropFactor = self.project.application.preferences.general.peakDropFactor with undoBlockWithoutSideBar(): for ii, ind in enumerate(indices): if ind is not None and positions[ind] is not None: selectedRegion.insert(ii, [positions[ind] - tolerances[ii], positions[ind] + tolerances[ii]]) else: selectedRegion.insert(ii, [limits[ii][0], limits[ii][1]]) # regionToPick = selectedRegion # peaks = self.pickPeaksNd(regionToPick, doPos=doPos, doNeg=doNeg, minDropFactor=minDropFactor) # axisCodeDict = dict((code, selectedRegion[ii]) for ii, code in enumerate(self.spectrum.axisCodes)) axisCodeDict = dict((code, region) for ii, (code, region) in enumerate(zip(self.spectrum.axisCodes, selectedRegion))) _spectrum = self.spectrum # may create a peakPicker instance if not defined, subject to settings in preferences _peakPicker = _spectrum.peakPicker if _peakPicker: _peakPicker.dropFactor = minDropFactor _peakPicker.setLineWidths = True peaks = _spectrum.pickPeaks(self, _spectrum.positiveContourBase if doPos else None, _spectrum.negativeContourBase if doNeg else None, **axisCodeDict) return peaks return []
[docs] def reorderValues(self, values, newAxisCodeOrder): """Reorder values in spectrum dimension order to newAxisCodeOrder by matching newAxisCodeOrder to spectrum axis code order""" return commonUtil.reorder(values, self._parent.axisCodes, newAxisCodeOrder)
[docs] @logCommand(get='self') def pickPeaksRegion(self, regionToPick: dict = {}, doPos: bool = True, doNeg: bool = True, minLinewidth=None, exclusionBuffer=None, minDropFactor: float = 0.1, checkAllAdjacent: bool = True, fitMethod: str = PARABOLICMETHOD, excludedRegions=None, excludedDiagonalDims=None, excludedDiagonalTransform=None, estimateLineWidths=True): getLogger().warning('Deprecated, please use spectrum.pickPeaks()') from ccpn.core.lib.PeakListLib import _pickPeaksRegion with undoBlockWithoutSideBar(): peaks = _pickPeaksRegion(self, regionToPick=regionToPick, doPos=doPos, doNeg=doNeg, minLinewidth=minLinewidth, exclusionBuffer=exclusionBuffer, minDropFactor=minDropFactor, checkAllAdjacent=checkAllAdjacent, fitMethod=fitMethod, excludedRegions=excludedRegions, excludedDiagonalDims=excludedDiagonalDims, excludedDiagonalTransform=excludedDiagonalTransform, estimateLineWidths=estimateLineWidths) return peaks
[docs] def fitExistingPeaks(self, peaks: Sequence['Peak'], fitMethod: str = GAUSSIANMETHOD, singularMode: bool = True, halfBoxSearchWidth: int = 2, halfBoxFitWidth: int = 2): """Refit the current selected peaks. Must be called with peaks that belong to this peakList """ from ccpn.core.lib.PeakListLib import _fitExistingPeaks # getLogger().warning('Deprecated, please use spectrum.fitExistingPeaks()') #comment-out until it is clear what is the new routine to use instead. return _fitExistingPeaks(self, peaks=peaks, fitMethod=fitMethod, singularMode=singularMode, halfBoxSearchWidth=halfBoxSearchWidth, halfBoxFitWidth=halfBoxFitWidth)
[docs] @logCommand(get='self') def calculateClusterIds(self, tolerances=None, clustererName=None): """ Calculate clusterIDs for peaks using the in Depth-First-Search (DFS) algorithm. """ from ccpn.core.lib.PeakClustering import PeakClusterers, DFSPeakClusterer if tolerances is None: defaultTolerancePoints = 8 tolerances = [defaultTolerancePoints]*self.spectrum.dimensionCount clusterer = PeakClusterers.get(clustererName, DFSPeakClusterer) peakClusterer = clusterer(self.peaks, tolerances) clusters = peakClusterer.findClusters() peakClusterer.setClusterIdToPeaks(clusters) return clusters
[docs] @logCommand(get='self') def resetClusterIds(self): """ Reset clusterIDs to a default enumeration. """ with undoBlockWithoutSideBar(): for i, peak in enumerate(self.peaks): peak.clusterId = i + 1
[docs] def getPeakAliasingRanges(self): """Return the min/max aliasing values for the peaks in the list, if there are no peaks, return None """ if not self.peaks: return None # calculate the min/max aliasing values for the spectrum dims = self.spectrum.dimensionCount aliasMin = [0] * dims aliasMax = [0] * dims for peak in self.peaks: alias = peak.aliasing aliasMax = np.maximum(aliasMax, alias) aliasMin = np.minimum(aliasMin, alias) # set min/max in spectrum here if peaks have been found aliasRanges = tuple((int(mn), int(mx)) for mn, mx in zip(aliasMin, aliasMax)) return aliasRanges
[docs] @logCommand(get='self') def reorderPeakListAxes(self, newAxisOrder): """Reorder the peak position according to the newAxisOrder """ dims = self.spectrum.dimensionCount if not isinstance(newAxisOrder, (list, tuple)): raise TypeError('newAxisOrder must be a list/tuple') if len(newAxisOrder) != dims: raise ValueError('newAxisOrder is the wrong length, must match spectrum dimensions') if len(set(newAxisOrder)) != len(newAxisOrder): raise ValueError('newAxisOrder contains duplicated elements') if not all(isinstance(ii, int) for ii in newAxisOrder): raise ValueError('newAxisOrder must be ints') if not all(0 <= ii < dims for ii in newAxisOrder): raise ValueError('newAxisOrder elements must be in range 0-%i', dims - 1) with undoBlockWithoutSideBar(): # reorder all peaks in the peakList for peak in self.peaks: pos = peak.position newPos = [] for ii in newAxisOrder: newPos.append(pos[ii]) peak.position = newPos
#=========================================================================================== # new'Object' and other methods # Call appropriate routines in their respective locations #===========================================================================================
[docs] @logCommand(get='self') def newPeak(self, ppmPositions: Sequence[float] = (), height: float = None, comment: str = None, **kwds): """Create a new Peak within a peakList. See the Peak class for details. Optional keyword arguments can be passed in; see Peak._newPeak for details. NB you must create the peak before you can assign it. The assignment attributes are: - assignments (assignedNmrAtoms) - A tuple of all (e.g.) assignment triplets for a 3D spectrum - assignmentsByDimensions (dimensionNmrAtoms) - A tuple of tuples of assignments, one for each dimension :param ppmPositions: peak position in ppm for each dimension (related attributes: positionError, pointPositions) :param height: height of the peak (related attributes: volume, volumeError, lineWidths) :param comment: optional comment string :return: a new Peak instance. """ from ccpn.core.Peak import _newPeak # imported here to avoid circular imports return _newPeak(self, ppmPositions=ppmPositions, height=height, comment=comment, **kwds)
[docs] @logCommand(get='self') def newPickedPeak(self, pointPositions: Sequence[float] = None, height: float = None, lineWidths: Sequence[float] = (), fitMethod: str = PARABOLICMETHOD, **kwds): """Create a new Peak within a peakList from a picked peak See the Peak class for details. Optional keyword arguments can be passed in; see Peak._newPickedPeak for details. :param height: height of the peak (related attributes: volume, volumeError, lineWidths) :param pointPositions: peak position in points for each dimension (related attributes: positionError, pointPositions) :param fitMethod: type of curve fitting :param lineWidths: :param serial: optional serial number. :return: a new Peak instance. """ from ccpn.core.Peak import _newPickedPeak # imported here to avoid circular imports return _newPickedPeak(self, pointPositions=pointPositions, height=height, lineWidths=lineWidths, fitMethod=fitMethod, **kwds)
#========================================================================================= # Connections to parents: #========================================================================================= @newObject(PeakList) def _newPeakList(self: Spectrum, title: str = None, comment: str = None, symbolStyle: str = None, symbolColour: str = None, textColour: str = None, meritColour: str = None, meritEnabled: bool = False, meritThreshold: float = None, lineColour: str = None, isSimulated: bool = False) -> PeakList: """Create new empty PeakList within Spectrum See the PeakList class for details. :param title: :param comment: :param isSimulated: :param symbolStyle: :param symbolColour: :param textColour: :return: a new PeakList instance. """ dd = {'name': title, 'details': comment, 'isSimulated': isSimulated} if symbolColour: dd['symbolColour'] = symbolColour if symbolStyle: dd['symbolStyle'] = symbolStyle if textColour: dd['textColour'] = textColour apiDataSource = self._apiDataSource apiPeakList = apiDataSource.newPeakList(**dd) result = self._project._data2Obj.get(apiPeakList) if result is None: raise RuntimeError('Unable to generate new PeakList item') # set non-api attributes if meritColour is not None: result.meritColour = meritColour if meritEnabled is not None: result.meritEnabled = meritEnabled if meritThreshold is not None: result.meritThreshold = meritThreshold if lineColour is not None: result.lineColour = lineColour return result # for sp in project.spectra: # c = sp.positiveContourColour # sp.peakLists[-1].symbolColour = c