Source code for ccpn.core.Multiplet

"""
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (http://www.ccpn.ac.uk) 2014 - 2021"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2021-08-20 19:26:48 +0100 (Fri, August 20, 2021) $"
__version__ = "$Revision: 3.0.4 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: Ed Brooksbank $"
__date__ = "$Date: 2017-04-07 10:28:41 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================

import numpy as np

from ccpn.core._implementation.AbstractWrapperObject import AbstractWrapperObject
from ccpn.core.Project import Project
from ccpn.core.Peak import Peak
from ccpn.core.MultipletList import MultipletList
from ccpnmodel.ccpncore.api.ccp.nmr import Nmr
from ccpnmodel.ccpncore.api.ccp.nmr.Nmr import Multiplet as apiMultiplet
from typing import Optional, Tuple, Any, Union, Sequence, List
from ccpn.util.Common import makeIterableList
from ccpn.util.decorators import logCommand
from ccpn.core.lib.ContextManagers import newObject, ccpNmrV3CoreSetter, undoBlock
from ccpn.util.Logging import getLogger
from ccpn.util.Constants import SCALETOLERANCE


MULTIPLET_TYPES = ['singlet', 'doublet', 'triplet', 'quartet', 'quintet', 'sextet', 'septet', 'octet', 'nonet',
                   'doublet of doublets', 'doublet of triplets', 'triplet of doublets', 'doublet of doublet of doublets']


def _calculateCenterOfMass(multiplet):
    """Calculate the centre of mass of the multiplet peaks
    :param multiplet: multiplet obj containing peaks.
    :return: the center of mass of the multiplet that can be used as peak position
             if you consider the multiplet as a single peak
    """

    try:
        _peaks = multiplet.peaks
        _lenPeaks = len(_peaks)
        if _lenPeaks > 0:
            position = ()
            dim = multiplet.multipletList.spectrum.dimensionCount
            if dim > 1:
                for d in range(dim):
                    peakPositions = [peak.position[d] for peak in _peaks]
                    # peakIntensities = [peak.height or 1 for peak in peaks]
                    # numerator = []
                    # for p, i in zip(peakPositions, peakIntensities):
                    #     numerator.append(p * i)
                    # centerOfMass = sum(numerator) / sum(peakIntensities)
                    # position += (centerOfMass,)

                    position += (sum(peakPositions) / _lenPeaks,)
            else:
                position = (sum([peak.position[0] for peak in _peaks]) / _lenPeaks,
                            sum([peak.height for peak in _peaks]) / _lenPeaks)
            return position
    except:
        return None


def _calculateCenterOfMassPoints(multiplet):
    """Calculate the centre of mass of the multiplet peaks (in points)
    :param multiplet: multiplet obj containing peaks.
    :return: the center of mass of the multiplet that can be used as peak position
             if you consider the multiplet as a single peak
    """

    try:
        _peaks = multiplet.peaks
        _lenPeaks = len(_peaks)
        if _lenPeaks > 0:
            position = ()
            dim = multiplet.multipletList.spectrum.dimensionCount
            if dim > 1:
                for d in range(dim):
                    peakPositions = [peak.pointPositions[d] for peak in _peaks]
                    position += (sum(peakPositions) / _lenPeaks,)
            else:
                position = (sum([peak.pointPositions[0] for peak in _peaks]) / _lenPeaks,
                            sum([peak.height for peak in _peaks]) / _lenPeaks)
            return position
    except:
        return None


def _getMultipletHeight(multiplet):
    """Derive the highest peak intensity from the multiplet peaks"""
    # NOTE:ED - a more accurate method may be needed here
    if len(multiplet.peaks) > 0:
        heights = [(peak.height or 0.0) for peak in multiplet.peaks]
        return heights[int(np.argmax(np.abs(heights)))]


def _getMultipletHeightError(multiplet):
    """Derive the height error from the multiplet peaks"""
    if len(multiplet.peaks) > 0:
        heightErrors = [(peak.heightError or 0.0) for peak in multiplet.peaks]
        return heightErrors[int(np.argmax(np.abs(heightErrors)))]


[docs]class Multiplet(AbstractWrapperObject): """Multiplet object, holding position, intensity, and assignment information Measurements that require more than one NmrAtom for an individual assignment (such as splittings, J-couplings, MQ dimensions, reduced-dimensionality experiments etc.) are not supported (yet). Assignments can be viewed and set either as a list of assignments for each dimension (dimensionNmrAtoms) or as a list of all possible assignment combinations (assignedNmrAtoms)""" #: Short class name, for PID. shortClassName = 'MT' # Attribute it necessary as subclasses must use superclass className className = 'Multiplet' _parentClass = MultipletList #: Name of plural link to instances of class _pluralLinkName = 'multiplets' #: List of child classes. _childClasses = [] # Qualified name of matching API class _apiClassQualifiedName = apiMultiplet._metaclass.qualifiedName() # the attribute name used by current _currentAttributeName = 'multiplets' # CCPN properties @property def _apiMultiplet(self) -> apiMultiplet: """API multiplets matching Multiplet.""" return self._wrappedData @property def _key(self) -> str: """id string - serial number converted to string.""" return str(self._wrappedData.serial) @property def serial(self) -> int: """serial number of Multiplet, used in Pid and to identify the Multiplet.""" return self._wrappedData.serial @property def _parent(self) -> Optional[MultipletList]: """parent containing multiplet.""" return self._project._data2Obj[self._wrappedData.multipletList] multipletList = _parent @property def height(self) -> Optional[float]: """height of Multiplet.""" height = _getMultipletHeight(self) return height # # cannot set the height - derived from peaks # @height.setter # def height(self, value: float): # self._wrappedData.height = value @property def heightError(self) -> Optional[float]: """height error of Multiplet.""" heightError = _getMultipletHeightError(self) return heightError # # cannot set the heightError - derived from peaks # @heightError.setter # def heightError(self, value: float): # self._wrappedData.heightError = value @property def volume(self) -> Optional[float]: """volume of Multiplet.""" if self._wrappedData.volume is None: return None scale = self.multipletList.spectrum.scale scale = scale if scale is not None else 1.0 if -SCALETOLERANCE < scale < SCALETOLERANCE: getLogger().warning('Scaling {}.volume by minimum tolerance (±{})'.format(self, SCALETOLERANCE)) return self._wrappedData.volume * scale @volume.setter @logCommand(get='self', isProperty=True) def volume(self, value: Union[float, int, None]): if not isinstance(value, (float, int, type(None))): raise TypeError('volume must be a float, integer or None') elif value is not None and (value - value) != 0.0: raise TypeError('volume cannot be NaN or Infinity') if value is None: self._wrappedData.volume = None else: scale = self.multipletList.spectrum.scale scale = scale if scale is not None else 1.0 if -SCALETOLERANCE < scale < SCALETOLERANCE: getLogger().warning('Scaling {}.volume by minimum tolerance (±{})'.format(self, SCALETOLERANCE)) self._wrappedData.volume = None else: self._wrappedData.volume = float(value) / scale @property def offset(self) -> Optional[float]: """offset of Multiplet.""" return self._wrappedData.offset @offset.setter @logCommand(get='self', isProperty=True) def offset(self, value: float): self._wrappedData.offset = value @property def constraintWeight(self) -> Optional[float]: """constraintWeight of Multiplet.""" return self._wrappedData.constraintWeight @constraintWeight.setter @logCommand(get='self', isProperty=True) def constraintWeight(self, value: float): self._wrappedData.constraintWeight = value @property def volumeError(self) -> Optional[float]: """volume error of Multiplet.""" if self._wrappedData.volumeError is None: return None scale = self.multipletList.spectrum.scale scale = scale if scale is not None else 1.0 if -SCALETOLERANCE < scale < SCALETOLERANCE: getLogger().warning('Scaling {}.volumeError by minimum tolerance (±{})'.format(self, SCALETOLERANCE)) return self._wrappedData.volumeError * scale @volumeError.setter @logCommand(get='self', isProperty=True) def volumeError(self, value: Union[float, int, None]): if not isinstance(value, (float, int, type(None))): raise TypeError('volumeError must be a float, integer or None') elif value is not None and (value - value) != 0.0: raise TypeError('volumeError cannot be NaN or Infinity') if value is None: self._wrappedData.volumeError = None else: scale = self.multipletList.spectrum.scale scale = scale if scale is not None else 1.0 if -SCALETOLERANCE < scale < SCALETOLERANCE: getLogger().warning('Scaling {}.volumeError by minimum tolerance (±{})'.format(self, SCALETOLERANCE)) self._wrappedData.volumeError = None else: self._wrappedData.volumeError = float(value) / scale @property def figureOfMerit(self) -> Optional[float]: """figureOfMerit of Multiplet, between 0.0 and 1.0 inclusive.""" return self._wrappedData.figOfMerit @figureOfMerit.setter @logCommand(get='self', isProperty=True) def figureOfMerit(self, value: float): self._wrappedData.figOfMerit = value @property def annotation(self) -> Optional[str]: """Multiplet text annotation.""" return self._wrappedData.annotation @annotation.setter @logCommand(get='self', isProperty=True) def annotation(self, value: str): self._wrappedData.annotation = value @property def slopes(self) -> List[float]: """slope (in dimension order) used in calculating multiplet value.""" return self._wrappedData.slopes @slopes.setter @logCommand(get='self', isProperty=True) @ccpNmrV3CoreSetter() def slopes(self, value): self._wrappedData.slopes = value @property def limits(self) -> List[Tuple[float, float]]: """limits (in dimension order) of the multiplet.""" return self._wrappedData.limits @limits.setter @logCommand(get='self', isProperty=True) @ccpNmrV3CoreSetter() def limits(self, value): self._wrappedData.limits = value @property def pointLimits(self) -> List[Tuple[float, float]]: """pointLimits (in dimension order) of the multiplet.""" return self._wrappedData.pointLimits @pointLimits.setter @logCommand(get='self', isProperty=True) @ccpNmrV3CoreSetter() def pointLimits(self, value): self._wrappedData.pointLimits = value @property def axisCodes(self) -> Tuple[str, ...]: """Spectrum axis codes in dimension order matching position.""" return self.multipletList.spectrum.axisCodes @property def peaks(self) -> Optional[Tuple[Any, ...]]: """List of peaks attached to the multiplet.""" if self._wrappedData: return tuple([self._project._data2Obj[pk] for pk in self._wrappedData.sortedPeaks() if pk in self._project._data2Obj]) else: return () @peaks.setter @logCommand(get='self', isProperty=True) @ccpNmrV3CoreSetter() def peaks(self, ll: list): if ll: pll = makeIterableList(ll) pks = [self.project.getByPid(peak) if isinstance(peak, str) else peak for peak in pll] for pp in pks: if not isinstance(pp, Peak): raise TypeError('%s is not of type Peak' % pp) toRemove = [pk for pk in self.peaks if pk not in pks] toAdd = [pk for pk in pks if pk not in self.peaks] self.removePeaks(toRemove) self.addPeaks(toAdd) @property def numPeaks(self) -> int: """return number of peaks in the multiplet.""" return len(self._wrappedData.sortedPeaks()) @property def position(self) -> Optional[Tuple[float, ...]]: """Multiplet position in ppm (or other relevant unit) in dimension order calculated as Center Of Mass. """ return _calculateCenterOfMass(self) ppmPositions = position # @property # def ppmPositions(self) -> Optional[Tuple[float, ...]]: # """Peak position in ppm (or other relevant unit) in dimension order calculated as Center Of Mass.""" # result = None # try: # pks = self.peaks # # pksPos = [pp.position for pp in pks] # if pks: # # self._position = tuple(sum(item) for item in zip(*pksPos)) # self._position = _calculateCenterOfMass(self) # result = self._position # # finally: # return result @property def positionError(self) -> Tuple[Optional[float], ...]: """Peak position error in ppm (or other relevant unit).""" # TODO:LUCA calculate this :) return tuple() # tuple(x.valueError for x in self._wrappedData.sortedPeaks()) @property def pointPositions(self) -> Optional[Tuple[float, ...]]: """Multiplet position in points (or other relevant unit) in dimension order calculated as Center Of Mass. """ return _calculateCenterOfMassPoints(self) @property def boxWidths(self) -> Tuple[Optional[float], ...]: """The full width of the peak footprint in points for each dimension, i.e. the width of the area that should be considered for integration, fitting, etc. .""" return tuple(x.boxWidth for x in self._wrappedData.sortedPeaks()) @property def lineWidths(self) -> Tuple[Optional[float], ...]: """Full-width-half-height of peak/multiplet for each dimension. """ result = tuple() pks = self.peaks pksWidths = [pp.lineWidths for pp in pks] try: result = tuple(sum(item) for item in zip(*pksWidths)) except Exception as es: if pks: result = list(pksWidths[0]) for otherPks in pksWidths[1:]: for ii in range(len(result)): result[ii] += otherPks[ii] else: result = self._wrappedData.lineWidths finally: return result @lineWidths.setter @logCommand(get='self', isProperty=True) @ccpNmrV3CoreSetter() def lineWidths(self, value): # NOTE:ED - check value is a tuple, etc. self._wrappedData.lineWidths = value # check what the peak is doing ppmLineWidths = lineWidths @property def pointLineWidths(self) -> Tuple[Optional[float], ...]: """Full-width-half-height of peak for each dimension, in points.""" # currently assumes that internal storage is in ppms result = tuple() pks = self.peaks pksWidths = [pp.pointLineWidths for pp in pks] try: result = tuple(sum(item) for item in zip(*pksWidths)) except Exception as es: if pks: result = list(pksWidths[0]) for otherPks in pksWidths[1:]: for ii in range(len(result)): result[ii] += otherPks[ii] else: result = self._wrappedData.lineWidths finally: return result @property def aliasing(self) -> Tuple[Optional[float], ...]: """Aliasing.""" # Returns the aliasing for first connected peak # quickest for the moment - need to imagine case where peaks are not from the same aliased region if self.peaks: return self.peaks[0].aliasing #========================================================================================= # Implementation functions #========================================================================================= # from ccpnmodel.ccpncore.api.memops import Implementation as ApiImplementation # # def __init__(self, project: 'Project', wrappedData: ApiImplementation.DataObject): # super().__init__(project, wrappedData) # # # attach a notifier to the peaks # from ccpn.core.lib.Notifiers import Notifier # # for pp in self.peaks: # Notifier(pp, ['observe'], Notifier.ANY, # callback=self._propagateAction, # onceOnly=True, debug=True) @classmethod def _getAllWrappedData(cls, parent: MultipletList) -> Tuple[apiMultiplet, ...]: """get wrappedData (Multiplets) for all Multiplet children of parent MultipletList""" return parent._wrappedData.sortedMultiplets() #========================================================================================= # CCPN functions #=========================================================================================
[docs] @logCommand(get='self') def addPeaks(self, peaks: Sequence[Union['Peak', str]]): """ Add a peak or list of peaks to the Multiplet. The peaks must belong to the spectrum containing the multipletList. :param peaks: single peak or list of peaks as objects or pids. """ spectrum = self._parent.spectrum peakList = makeIterableList(peaks) pks = [] for peak in peakList: pks.append(self.project.getByPid(peak) if isinstance(peak, str) else peak) for pp in pks: if not isinstance(pp, Peak): raise TypeError('%s is not of type Peak' % pp) if pp not in spectrum.peaks: raise ValueError('%s does not belong to spectrum: %s' % (pp.pid, spectrum.pid)) with undoBlock(): for pk in pks: if pk not in self.peaks: # ignore duplicates self._wrappedData.addPeak(pk._wrappedData)
[docs] @logCommand(get='self') def removePeaks(self, peaks: Sequence[Union['Peak', str]]): """ Remove a peak or list of peaks from the Multiplet. The peaks must belong to the multiplet. :param peaks: single peak or list of peaks as objects or pids """ spectrum = self._parent.spectrum peakList = makeIterableList(peaks) pks = [] for peak in peakList: pks.append(self.project.getByPid(peak) if isinstance(peak, str) else peak) for pp in pks: if not isinstance(pp, Peak): raise TypeError('%s is not of type Peak' % pp) if pp not in self.peaks: raise ValueError('%s does not belong to multiplet: %s' % (pp.pid, self.pid)) if pp not in spectrum.peaks: raise ValueError('%s does not belong to spectrum: %s' % (pp.pid, spectrum.pid)) with undoBlock(): for pk in pks: self._wrappedData.removePeak(pk._wrappedData)
def _propagateAction(self, data): from ccpn.core.lib.Notifiers import Notifier trigger = data[Notifier.TRIGGER] trigger = 'change' if trigger == 'observe' else trigger if trigger in ['change']: self._finaliseAction(trigger)
#=========================================================================================== # new'Object' and other methods # Call appropriate routines in their respective locations #=========================================================================================== #========================================================================================= # Connections to parents #========================================================================================= @newObject(Multiplet) def _newMultiplet(self: MultipletList, height: float = 0.0, heightError: float = 0.0, volume: float = 0.0, volumeError: float = 0.0, offset: float = 0.0, constraintWeight: float = 0.0, figureOfMerit: float = 1.0, annotation: str = None, comment: str = None, limits: Sequence[Tuple[float, float]] = (), slopes: List[float] = (), pointLimits: Sequence[Tuple[float, float]] = (), peaks: Sequence[Union['Peak', str]] = ()) -> Multiplet: """Create a new Multiplet within a multipletList See the Multiplet class for details. :param height: :param heightError: :param volume: :param volumeError: :param offset: :param constraintWeight: :param figureOfMerit: :param annotation: :param comment: :param limits: :param slopes: :param pointLimits: :param peaks: :return: a new Multiplet instance. """ spectrum = self.spectrum peakList = makeIterableList(peaks) pks = [] for peak in peakList: pks.append(self.project.getByPid(peak) if isinstance(peak, str) else peak) for pp in pks: if not isinstance(pp, Peak): raise TypeError('%s is not of type Peak' % pp) if pp not in spectrum.peaks: raise ValueError('%s does not belong to spectrum: %s' % (pp.pid, spectrum.pid)) dd = {'height' : height, 'heightError': heightError, 'volume' : volume, 'volumeError': volumeError, 'offset': offset, 'slopes': slopes, 'figOfMerit': figureOfMerit, 'constraintWeight': constraintWeight, 'annotation': annotation, 'details': comment, 'limits' : limits, 'pointLimits': pointLimits} if pks: dd['peaks'] = [pk._wrappedData for pk in pks] # remove items that can't be set to None in the model if not offset: del dd['offset'] if not constraintWeight: del dd['constraintWeight'] apiParent = self._apiMultipletList apiMultiplet = apiParent.newMultiplet(multipletType='multiplet', **dd) result = self._project._data2Obj.get(apiMultiplet) if result is None: raise RuntimeError('Unable to generate new Multiplet item') return result # EJB 20181127: removed # Multiplet._parentClass.newMultiplet = _newMultiplet # del _newMultiplet # EJB 20181128: removed, to be added to multiplet __init__? # Notify Multiplets when the contents of peaks have changed # i.e PeakDim references Project._apiNotifiers.append( ('_notifyRelatedApiObject', {'pathToObject': 'peak.multiplets', 'action': 'change'}, Nmr.PeakDim._metaclass.qualifiedName(), '') )