"""
Module documentation here
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
"Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See https://ccpn.ac.uk/software/licensing/")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
"CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
"J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2022-02-02 23:55:21 +0000 (Wed, February 02, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2017-04-07 10:28:41 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================
from ccpn.core._implementation.AbstractWrapperObject import AbstractWrapperObject
from ccpn.core.Project import Project
from ccpn.core.IntegralList import IntegralList
from ccpn.core.Peak import Peak
from ccpnmodel.ccpncore.api.ccp.nmr.Nmr import Integral as ApiIntegral
from ccpnmodel.ccpncore.api.ccp.nmr.Nmr import PeakDim as ApiPeakDim
from typing import Optional, Tuple, Sequence, List, Union
import numpy as np
from scipy.integrate import trapz
from ccpn.util.decorators import logCommand
from ccpn.core.lib.ContextManagers import newObject, ccpNmrV3CoreSetter
from ccpn.util.Logging import getLogger
from ccpn.util.Constants import SCALETOLERANCE
LinkedPeaks = 'linkedPeaks'
[docs]class Integral(AbstractWrapperObject):
"""n-dimensional Integral, with integration region and value.
Includes fields for per-dimension values.
"""
#: Short class name, for PID.
shortClassName = 'IT'
# Attribute it necessary as subclasses must use superclass className
className = 'Integral'
_parentClass = IntegralList
#: Name of plural link to instances of class
_pluralLinkName = 'integrals'
# the attribute name used by current
_currentAttributeName = 'integrals'
#: List of child classes.
_childClasses = []
# Qualified name of matching API class - NB shared with Peak class
_apiClassQualifiedName = ApiIntegral._metaclass.qualifiedName()
# _baseline = None
_linkedPeakNotifier = None
_linkedPeaks = set()
# CCPN properties
@property
def _apiIntegral(self) -> ApiIntegral:
""" API integrals matching Integral"""
return self._wrappedData
@property
def _key(self) -> str:
"""id string - serial number converted to string"""
return str(self._wrappedData.serial)
@property
def serial(self) -> int:
"""serial number of Integral, used in Pid and to identify the Integral. """
return self._wrappedData.serial
@property
def _parent(self) -> IntegralList:
"""IntegralList containing Integral."""
return self._project._data2Obj[self._wrappedData.integralList]
integralList = _parent
@property
def value(self) -> Optional[float]:
"""value of Integral"""
if self._wrappedData.volume is None:
return None
scale = self.integralList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.value by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
return self._wrappedData.volume * scale
@value.setter
@logCommand(get='self', isProperty=True)
def value(self, value: Union[float, int, None]):
if not isinstance(value, (float, int, type(None))):
raise TypeError('value must be a float, integer or None')
elif value is not None and (value - value) != 0.0:
raise TypeError('value cannot be NaN or Infinity')
if value is None:
self._wrappedData.volume = None
else:
scale = self.integralList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.value by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
self._wrappedData.volume = None
else:
self._wrappedData.volume = float(value) / scale
@property
def valueError(self) -> Optional[float]:
"""value error of Integral"""
if self._wrappedData.volumeError is None:
return None
scale = self.integralList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.valueError by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
return self._wrappedData.volumeError * scale
@valueError.setter
@logCommand(get='self', isProperty=True)
def valueError(self, value: Union[float, int, None]):
if not isinstance(value, (float, int, type(None))):
raise TypeError('valueError must be a float, integer or None')
elif value is not None and (value - value) != 0.0:
raise TypeError('valueError cannot be NaN or Infinity')
if value is None:
self._wrappedData.volumeError = None
else:
scale = self.integralList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.valueError by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
self._wrappedData.volumeError = None
else:
self._wrappedData.volumeError = float(value) / scale
@property
def bias(self) -> float:
"""Baseplane offset used in calculating integral value"""
scale = self.integralList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.bias by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
return self._wrappedData.offset * scale
@bias.setter
@logCommand(get='self', isProperty=True)
def bias(self, value: Union[float, int]):
if not isinstance(value, (float, int)):
raise TypeError('bias must be a float or integer')
elif (value - value) != 0.0:
raise TypeError('bias cannot be NaN or Infinity')
value = float(value)
scale = self.integralList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.bias by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
self._wrappedData.offset = 0.0
else:
self._wrappedData.offset = value / scale
@property
def figureOfMerit(self) -> Optional[float]:
"""figureOfMerit of Integral, between 0.0 and 1.0 inclusive."""
return self._wrappedData.figOfMerit
@figureOfMerit.setter
@logCommand(get='self', isProperty=True)
def figureOfMerit(self, value: float):
self._wrappedData.figOfMerit = value
@property
def offset(self) -> float:
"""offset of Integral"""
scale = self.integralList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.offset by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
return self._wrappedData.offset * scale
@offset.setter
@logCommand(get='self', isProperty=True)
def offset(self, value: Union[float, int]):
if not isinstance(value, (float, int)):
raise TypeError('offset must be a float or integer')
elif (value - value) != 0.0:
raise TypeError('offset cannot be NaN or Infinity')
value = float(value)
scale = self.integralList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.offset by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
self._wrappedData.offset = 0.0
else:
self._wrappedData.offset = value / scale
# NOTE:ED - check, baseline is currently using offset in the model
@property
def baseline(self) -> float:
"""baseline of Integral"""
scale = self.integralList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.baseline by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
return self._wrappedData.offset * scale
@baseline.setter
@logCommand(get='self', isProperty=True)
def baseline(self, value: Union[float, int]):
if not isinstance(value, (float, int)):
raise TypeError('baseline must be a float or integer')
elif (value - value) != 0.0:
raise TypeError('baseline cannot be NaN or Infinity')
value = float(value)
scale = self.integralList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.baseline by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
self._wrappedData.offset = 0.0
else:
self._wrappedData.offset = value / scale
@property
def constraintWeight(self) -> Optional[float]:
"""constraintWeight of Integral"""
return self._wrappedData.constraintWeight
@constraintWeight.setter
@logCommand(get='self', isProperty=True)
def constraintWeight(self, value: float):
self._wrappedData.constraintWeight = value
@property
def slopes(self) -> Optional[List[float]]:
"""slope (in dimension order) used in calculating integral value
The slope is defined as the intensity in point i+1 minus the intensity in point i"""
# return [x.slope for x in self._wrappedData.sortedPeakDims()]
if self._wrappedData.slopes is None:
return None
scale = self.integralList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.slopes by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
return [slope * scale for slope in self._wrappedData.slopes]
@slopes.setter
@logCommand(get='self', isProperty=True)
@ccpNmrV3CoreSetter()
def slopes(self, value: Union[List[float], Tuple[float], None] = None):
if not isinstance(value, (list, tuple, type(None))):
raise TypeError('slopes must be a None or list/tuple of floats - {}'.format(value))
if value and not all(isinstance(sl, float) for sl in value):
raise TypeError('slopes must be a None or list/tuple of floats - {}'.format(value))
if value is None:
self._wrappedData.slopes = None
else:
scale = self.integralList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.slopes by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
self._wrappedData.slopes = None
else:
self._wrappedData.slopes = [sl / scale for sl in value]
# peakDims = self._wrappedData.sortedPeakDims()
# if len(value) == len(peakDims):
# for tt in zip(peakDims, value):
# tt[0].slope = tt[1]
# else:
# raise ValueError("The slopes value %s does not match the dimensionality of the spectrum, %s"
# % value, len(peakDims))
@property
def annotation(self) -> Optional[str]:
"""Integral text annotation"""
return self._wrappedData.annotation
@annotation.setter
@logCommand(get='self', isProperty=True)
def annotation(self, value: str):
self._wrappedData.annotation = value
@property
def axisCodes(self) -> Tuple[str, ...]:
"""Spectrum axis codes in dimension order matching position."""
return self.integralList.spectrum.axisCodes
@property
def limits(self) -> List[Tuple[float, float]]:
"""Integration limits in axis value (ppm), per dimension, with lowest
ppm value first
:return list of (low_ppm, high_ppm) tuples
"""
return self._wrappedData.limits
@limits.setter
@logCommand(get='self', isProperty=True)
@ccpNmrV3CoreSetter()
def limits(self, value):
self._wrappedData.limits = value
# # automatically calculates Volume given the limits for 1Ds
# spectrum = self.integralList.spectrum
#
# if spectrum.dimensionCount == 1:
# for ii in range(spectrum.dimensionCount):
# limits = value[ii] if value and len(value) > ii else ()
# if len(limits) == 2:
#
# if spectrum.intensities is not None and spectrum.intensities.size != 0:
# limit1, limit2 = limits
# x = spectrum.positions
# index01 = np.where((x <= limit2) & (x >= limit1))
# values = spectrum.intensities[index01]
# self.value = float(trapz(values))
#
# # small change, only calculate if there is a peak
# if self.peak:
# self.peak.volume = self.value
@property
def pointLimits(self) -> List[Tuple[float, float]]:
return self._wrappedData.pointLimits
# """Integration limits in points, per dimension, with lowest value first"""
# result = []
# for peakDim in self._wrappedData.sortedPeakDims():
# position = peakDim.position
# halfWidth = 0.5 * (peakDim.boxWidth or 0)
# result.append(position - halfWidth, position + halfWidth)
# #
# return result
@pointLimits.setter
@logCommand(get='self', isProperty=True)
@ccpNmrV3CoreSetter()
def pointLimits(self, value):
self._wrappedData.pointLimits = value
# peakDims = self._wrappedData.sortedPeakDims()
# if len(value) == len(peakDims):
# for ii, peakDim in enumerate(peakDims):
# if None in value[ii]:
# peakDim.position = None
# peakDim.boxWidth = None
# else:
# limit1, limit2 = value[ii]
# peakDim.position = 0.5 * (limit1 + limit2)
# peakDim.boxWidth = abs(limit1 - limit2)
# else:
# raise ValueError("The slopes value %s does not match the dimensionality of the spectrum, %s"
# % value, len(peakDims))
#=========================================================================================
# Implementation functions
#=========================================================================================
@classmethod
def _getAllWrappedData(cls, parent: IntegralList) -> Tuple[ApiIntegral, ...]:
"""get wrappedData (Integrals) for all Integral children of parent IntegralList"""
return parent._wrappedData.sortedIntegrals()
@property
def _1Dregions(self):
"""
:return:baseline of the integral, x regions and y regions in separate arrays
"""
# baseline = self.baseline
# NOTE:ED - now using offset in the model, slope will determine the angle of the baseline
# calculate slope automatically?
baseline = self.baseline
for i in self.limits:
x = self.integralList.spectrum.positions
y = self.integralList.spectrum.intensities
xRegions = np.where((x <= max(i)) & (x >= min(i)))
for xRegion in xRegions:
if baseline is not None:
# try:
# baseline = min(y[xRegion])
return (baseline, x[xRegion], y[xRegion])
# except Exception as es:
# # TODO:Luca check empty list error
# pass
# should be just one for 1D
return []
#=========================================================================================
# CCPN functions
#=========================================================================================
@property
def peak(self):
"""The peak attached to the integral.
"""
return self._project._data2Obj[self._wrappedData.peak] if self._wrappedData.peak else None
@peak.setter
@logCommand(get='self', isProperty=True)
@ccpNmrV3CoreSetter()
def peak(self, peak: Peak = None):
"""link a peak to the integral
The peak must belong to the spectrum containing the integralList.
:param peak: single peak
"""
spectrum = self._parent.spectrum
peak = self.project.getByPid(peak) if isinstance(peak, str) else peak
if peak:
if not isinstance(peak, Peak):
raise TypeError('%s is not of type Peak' % peak)
if peak not in spectrum.peaks:
raise ValueError('%s does not belong to spectrum: %s' % (peak.pid, spectrum.pid))
self._wrappedData.peak = peak._wrappedData if peak else None
#===========================================================================================
# new'Object' and other methods
# Call appropriate routines in their respective locations
#===========================================================================================
#=========================================================================================
# Connections to parents:
#=========================================================================================
@newObject(Integral)
def _newIntegral(self: IntegralList,
value: List[float] = None, valueError: List[float] = None, bias: float = 0,
offset: float = None, constraintWeight: float = None,
figureOfMerit: float = 1.0, annotation: str = None, comment: str = None,
limits: Sequence[Tuple[float, float]] = (), slopes: List[float] = None,
pointLimits: Sequence[Tuple[float, float]] = ()
) -> Integral:
"""Create new Integral within IntegralList
See the Integral class for details.
:param value:
:param valueError:
:param bias:
:param offset:
:param constraintWeight:
:param figureOfMerit:
:param annotation:
:param comment:
:param limits:
:param slopes:
:param pointLimits:
:return a new Integral instance.
"""
dd = {'volume': value, 'volumeError': valueError, 'offset': offset, 'slopes': slopes,
'figOfMerit': figureOfMerit, 'constraintWeight': constraintWeight,
'annotation': annotation, 'details': comment,
'limits': limits, 'pointLimits': pointLimits}
if not constraintWeight: del dd['constraintWeight']
if not offset: del dd['offset']
apiParent = self._apiIntegralList
apiIntegral = apiParent.newIntegral(**dd)
result = self._project._data2Obj.get(apiIntegral)
if result is None:
raise RuntimeError('Unable to generate new Integral item')
result.limits = limits #needs to fire the first time for automatic calculation of the value
return result
# Integral._parentClass.newIntegral = _newIntegral
# del _newIntegral
# def _factoryFunction(project:Project, wrappedData:ApiIntegral) -> AbstractWrapperObject:
# """create Peak or Integral from API Peak"""
# if wrappedData.peakList.dataType == 'Peak':
# return Peak(project, wrappedData)
# elif wrappedData.peakList.dataType == 'Integral':
# return Integral(project, wrappedData)
# else:
# raise ValueError("API Peak object has illegal parent dataType: %s. Must be 'Peak' or 'Integral"
# % wrappedData.dataType)
#
#
# Integral._factoryFunction = staticmethod(_factoryFunction)
# Peak._factoryFunction = staticmethod(_factoryFunction)
# Additional Notifiers:
# NB API level notifiers are defined in the Peak file for API Peaks
# They will have the same effect for integrals
Project._apiNotifiers.append(
('_notifyRelatedApiObject', {'pathToObject': 'peak.integral', 'action': 'change'},
ApiPeakDim._metaclass.qualifiedName(), '')
)