Source code for ccpn.core.IntegralList

"""
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (http://www.ccpn.ac.uk) 2014 - 2021"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2021-06-25 17:35:46 +0100 (Fri, June 25, 2021) $"
__version__ = "$Revision: 3.0.4 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2017-04-07 10:28:41 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================

from typing import List, Tuple
import numpy as np
from scipy import signal

from ccpnmodel.ccpncore.api.ccp.nmr.Nmr import IntegralList as ApiIntegralList
from ccpn.core.Spectrum import Spectrum
from ccpn.core._implementation.PMIListABC import PMIListABC
from ccpn.core.lib.SpectrumLib import _oldEstimateNoiseLevel1D, estimateNoiseLevel1D, _filterROI1Darray
from ccpn.core.lib.ContextManagers import newObject
from ccpn.util.decorators import logCommand
from ccpn.util.Logging import getLogger


# moved on peakUtil ####################################################################
def _createIntersectingLine(x, y):
    """create a straight line with x values like the original spectrum and y value from the estimated noise level"""
    return [_oldEstimateNoiseLevel1D(x, y)] * len(x)


def _getIntersectionPoints(x, y, line):
    """
    :param line: x points of line to intersect y points
    :return: list of intersecting points
    """
    z = y - line
    dx = x[1:] - x[:-1]
    cross = np.sign(z[:-1] * z[1:])

    x_intersect = x[:-1] - dx / (z[1:] - z[:-1]) * z[:-1]
    negatives = np.where(cross < 0)
    points = x_intersect[negatives]
    return points


def _pairIntersectionPoints(intersectionPoints):
    """ Yield successive pair chunks from list of intersectionPoints """
    for i in range(0, len(intersectionPoints), 2):
        pair = intersectionPoints[i:i + 2]
        if len(pair) == 2:
            yield pair


def _getPeaksLimits(x, y, intersectingLine=None):
    """Get the limits of each peak of the spectrum given an intersecting line. If
     intersectingLine is None, it is calculated by the STD of the spectrum"""
    if intersectingLine is None:
        intersectingLine = _createIntersectingLine(x, y)
    limits = _getIntersectionPoints(x, y, intersectingLine)
    limitsPairs = list(_pairIntersectionPoints(limits))
    return limitsPairs


########################################################################################################################################


[docs]class IntegralList(PMIListABC): """An object containing Integrals. Note: the object is not a (subtype of a) Python list. To access all Integral objects, use integralList.integrals.""" #: Short class name, for PID. shortClassName = 'IL' # Attribute it necessary as subclasses must use superclass className className = 'IntegralList' _parentClass = Spectrum #: Name of plural link to instances of class _pluralLinkName = 'integralLists' #: List of child classes. _childClasses = [] # Qualified name of matching API class - NB shared with PeakList _apiClassQualifiedName = ApiIntegralList._metaclass.qualifiedName() #========================================================================================= # CCPN properties #========================================================================================= @property def _apiIntegralList(self) -> ApiIntegralList: """API peakLists matching IntegralList.""" return self._wrappedData def _setPrimaryChildClass(self): """Set the primary classType for the child list attached to this container """ from ccpn.core.Integral import Integral as klass if not klass in self._childClasses: raise TypeError('PrimaryChildClass %s does not exist as child of %s' % (klass.className, self.className)) self._primaryChildClass = klass #========================================================================================= # Implementation functions #========================================================================================= @classmethod def _getAllWrappedData(cls, parent: Spectrum) -> Tuple[ApiIntegralList, ...]: """get wrappedData (PeakLists) for all IntegralList children of parent Spectrum""" return parent._wrappedData.sortedIntegralLists() def _finaliseAction(self, action: str): """Subclassed to notify changes to associated integralListViews """ if not super()._finaliseAction(action): return try: if action in ['change']: for ilv in self.integralListViews: ilv._finaliseAction(action) except Exception as es: raise RuntimeError('Error _finalising integralListViews: %s' % str(es)) #========================================================================================= # CCPN functions #=========================================================================================
[docs] def findLimits(self, f=20, stdFactor=0.001): spectrum = self.spectrum x, y = np.array(spectrum.positions), np.array(spectrum.intensities) const = int(len(y) * 0.0039) y2 = signal.correlate(y, np.ones(const), mode='same') / const yy = y - y2 # if noiseThreshold is None: maxNL, minNL = estimateNoiseLevel1D(yy, f=f, stdFactor=stdFactor) intersectingLine = [maxNL] * len(x) limitsPairs = _getPeaksLimits(x, yy, intersectingLine) return limitsPairs, maxNL, minNL
[docs] def automaticIntegral1D(self, minimalLineWidth=0.01, deltaFactor=1.5, findPeak=False, noiseThreshold=None) -> List['Integral']: """ minimalLineWidth: an attempt to exclude noise. Below this threshold the area is discarded. noiseThreshold: value used to calculate the intersectingLine to get the peak limits """ # TODO: add excludeRegions option. Calculate Negative peak integral. # self._project.suspendNotification from ccpn.core.lib.PeakPickers.PeakPicker1D import _find1DMaxima from ccpn.core.PeakList import estimateNoiseLevel1D try: spectrum = self.spectrum if findPeak: peakList = spectrum.newPeakList() x, y = np.array(spectrum.positions), np.array(spectrum.intensities) const = int(len(y) * 0.0039) y2 = signal.correlate(y, np.ones(const), mode='same') / const yy = y-y2 if noiseThreshold is None: # maxNL, minNL = estimateNoiseLevel1D(yy, f=20, stdFactor=0.001) noiseThreshold, minNL = estimateNoiseLevel1D(yy, f=20, stdFactor=0.001) intersectingLine = [noiseThreshold] * len(x) # else: # intersectingLine = [noiseThreshold] * len(x) limitsPairs = _getPeaksLimits(x, y, intersectingLine) spectrum.noiseLevel = noiseThreshold integrals = [] for i in limitsPairs: minI, maxI = min(i), max(i) # aminI = minI - percentage(0.05, minI) #add a bit on each sides # amaxI = maxI + percentage(0.05, minI) # print(i, aminI,amaxI) lineWidth = abs(maxI - minI) if lineWidth: newIntegral = self.newIntegral(value=None, limits=[[minI, maxI], ]) newIntegral.baseline = noiseThreshold filteredX = np.where((x <= i[0]) & (x >= i[1])) y_filtered = spectrum.intensities[filteredX] x_filtered = filteredX[0].flatten() # filteredY = filteredY[1].compressed() if findPeak: # pick peaks and link to integral maxValues, minValues = _find1DMaxima(y_filtered, x_filtered, positiveThreshold=noiseThreshold, findNegative=False) if len(maxValues) > 1: #calculate centre of mass or # add to multiplet ?? positions = [] heights = [] numerator = [] for position, height in maxValues: positions.append(x[position]) heights.append(height) for p, h in zip(positions, heights): numerator.append(p * h) centerOfMass = sum(numerator) / sum(heights) newPeak = peakList.newPeak(ppmPositions=[centerOfMass, ], height=max(heights)) newIntegral.peak = newPeak newPeak.volume = newIntegral.value newPeak.lineWidths = (lineWidth,) else: for position, height in maxValues: newPeak = peakList.newPeak(ppmPositions=[float(x[position]), ], height=height) newIntegral.peak = newPeak newPeak.volume = newIntegral.value newPeak.lineWidths = (lineWidth,) if intersectingLine: newIntegral.baseline = intersectingLine[0] integrals.append(newIntegral) finally: # self._project.resumeNotification() pass return integrals
#=========================================================================================== # new'Object' and other methods # Call appropriate routines in their respective locations #===========================================================================================
[docs] @logCommand(get='self') def newIntegral(self, value: List[float] = None, comment: str = None, **kwds): """Create a new integral within an IntegralList. See the Integral class for details. Optional keyword arguments can be passed in; see Integral._newIntegral for details. :param value: (min, max) values in ppm for the integral :param comment: optional comment string :return: a new Integral instance """ from ccpn.core.Integral import _newIntegral # imported here to avoid circular imports return _newIntegral(self, value=value, comment=comment, **kwds)
#========================================================================================= # Connections to parents: #========================================================================================= @newObject(IntegralList) def _newIntegralList(self: Spectrum, title: str = None, comment: str = None, symbolStyle: str = None, symbolColour: str = None, textColour: str = None, meritColour: str = None, meritEnabled: bool = False, meritThreshold: float = None, lineColour: str = None, ) -> IntegralList: """ Create new IntegralList within Spectrum. See the IntegralList class for details. :param title: :param symbolColour: :param textColour: :param comment: :return: a new IntegralList attached to the spectrum. """ dd = {'name': title, 'details': comment, 'dataType': 'Integral'} if symbolStyle: dd['symbolStyle'] = symbolStyle if symbolColour: dd['symbolColour'] = symbolColour if textColour: dd['textColour'] = textColour apiDataSource = self._apiDataSource apiIntegralList = apiDataSource.newIntegralList(**dd) result = self._project._data2Obj.get(apiIntegralList) if result is None: raise RuntimeError('Unable to generate new IntegralList item') # set non-api attributes if meritColour is not None: result.meritColour = meritColour if meritEnabled is not None: result.meritEnabled = meritEnabled if meritThreshold is not None: result.meritThreshold = meritThreshold if lineColour is not None: result.lineColour = lineColour return result