"""
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
"Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See https://ccpn.ac.uk/software/licensing/")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
"CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
"J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Geerten Vuister $"
__dateModified__ = "$dateModified: 2022-03-08 22:20:58 +0000 (Tue, March 08, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2017-04-07 10:28:41 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================
import itertools
import operator
import numpy as np
from functools import partial
from typing import Optional, Tuple, Union, Sequence, Any
from ccpn.core.lib.AxisCodeLib import _axisCodeMapIndices
from ccpn.util import Common as commonUtil
from ccpn.core._implementation.AbstractWrapperObject import AbstractWrapperObject
from ccpn.core.Project import Project
from ccpn.core.PeakList import PeakList, PARABOLICMETHOD
from ccpn.core.NmrAtom import NmrAtom
from ccpnmodel.ccpncore.api.ccp.nmr import Nmr
from ccpn.core.lib.peakUtils import _getPeakSNRatio, snapToExtremum as peakUtilsSnapToExtremum
from ccpn.util.decorators import logCommand
from ccpn.core.lib.ContextManagers import newObject, ccpNmrV3CoreSetter, \
undoBlock, undoBlockWithoutSideBar, undoStackBlocking, ccpNmrV3CoreUndoBlock
from ccpn.util.Logging import getLogger
from ccpn.util.Common import makeIterableList, isIterable
from ccpn.util.Constants import SCALETOLERANCE
from ccpn.core.NmrAtom import UnknownIsotopeCode
[docs]class Peak(AbstractWrapperObject):
"""Peak object, holding position, intensity, and assignment information
Measurements that require more than one NmrAtom for an individual assignment
(such as splittings, J-couplings, MQ dimensions, reduced-dimensionality
experiments etc.) are not supported (yet). Assignments can be viewed and set
either as a list of assignments for each dimension (dimensionNmrAtoms) or as a
list of all possible assignment combinations (assignedNmrAtoms)"""
#: Short class name, for PID.
shortClassName = 'PK'
# Attribute it necessary as subclasses must use superclass className
className = 'Peak'
_parentClass = PeakList
#: Name of plural link to instances of class
_pluralLinkName = 'peaks'
# the attribute name used by current
_currentAttributeName = 'peaks'
#: List of child classes.
_childClasses = []
# Qualified name of matching API class
_apiClassQualifiedName = Nmr.Peak._metaclass.qualifiedName()
# CCPN properties
@property
def _apiPeak(self) -> Nmr.Peak:
"""API peaks matching Peak"""
return self._wrappedData
@property
def _key(self) -> str:
"""id string - serial number converted to string."""
return str(self._wrappedData.serial)
@property
def serial(self) -> int:
"""serial number of Peak, used in Pid and to identify the Peak."""
return self._wrappedData.serial
@property
def _parent(self) -> Optional[PeakList]:
"""PeakList containing Peak."""
return self._project._data2Obj[self._wrappedData.peakList] \
if self._wrappedData.peakList in self._project._data2Obj else None
peakList = _parent
@property
def spectrum(self):
"""Convenience property to get the spectrum, equivalent to peak.peakList.spectrum
"""
return self.peakList.spectrum
@property
def chemicalShiftList(self):
"""Convenience property to get the spectrum, equivalent to peak.peakList.chemicalShiftList
"""
return self.peakList.chemicalShiftList
@property
def restraints(self) -> tuple:
"""Restraints corresponding to Peak"""
# placeholder, hotfixed later
pass
@property
def height(self) -> Optional[float]:
"""height of Peak."""
if self._wrappedData.height is None:
return None
scale = self.peakList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.height by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
return self._wrappedData.height * scale
@height.setter
@logCommand(get='self', isProperty=True)
def height(self, value: Union[float, int, None]):
if not isinstance(value, (float, int, type(None))):
raise TypeError('height must be a float, integer or None')
elif value is not None and (value - value) != 0.0:
raise TypeError('height cannot be NaN or Infinity')
if value is None:
self._wrappedData.height = None
else:
scale = self.peakList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.height by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
self._wrappedData.height = None
else:
self._wrappedData.height = float(value) / scale
@property
def heightError(self) -> Optional[float]:
"""height error of Peak."""
if self._wrappedData.heightError is None:
return None
scale = self.peakList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.heightError by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
return self._wrappedData.heightError * scale
@heightError.setter
@logCommand(get='self', isProperty=True)
def heightError(self, value: Union[float, int, None]):
if not isinstance(value, (float, int, type(None))):
raise TypeError('heightError must be a float, integer or None')
elif value is not None and (value - value) != 0.0:
raise TypeError('heightError cannot be NaN or Infinity')
if value is None:
self._wrappedData.heightError = None
else:
scale = self.peakList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.heightError by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
self._wrappedData.heightError = None
else:
self._wrappedData.heightError = float(value) / scale
@property
def volume(self) -> Optional[float]:
"""volume of Peak."""
if self._wrappedData.volume is None:
return None
scale = self.peakList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.volume by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
return self._wrappedData.volume * scale
@volume.setter
@logCommand(get='self', isProperty=True)
def volume(self, value: Union[float, int, None]):
if not isinstance(value, (float, int, type(None))):
raise TypeError('volume must be a float, integer or None')
elif value is not None and (value - value) != 0.0:
raise TypeError('volume cannot be NaN or Infinity')
if value is None:
self._wrappedData.volume = None
else:
scale = self.peakList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.volume by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
self._wrappedData.volume = None
else:
self._wrappedData.volume = float(value) / scale
@property
def volumeError(self) -> Optional[float]:
"""volume error of Peak."""
if self._wrappedData.volumeError is None:
return None
scale = self.peakList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.volumeError by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
return self._wrappedData.volumeError * scale
@volumeError.setter
@logCommand(get='self', isProperty=True)
def volumeError(self, value: Union[float, int, None]):
if not isinstance(value, (float, int, type(None))):
raise TypeError('volumeError must be a float, integer or None')
elif value is not None and (value - value) != 0.0:
raise TypeError('volumeError cannot be NaN or Infinity')
if value is None:
self._wrappedData.volumeError = None
else:
scale = self.peakList.spectrum.scale
scale = scale if scale is not None else 1.0
if -SCALETOLERANCE < scale < SCALETOLERANCE:
getLogger().warning('Scaling {}.volumeError by minimum tolerance (±{})'.format(self, SCALETOLERANCE))
self._wrappedData.volumeError = None
else:
self._wrappedData.volumeError = float(value) / scale
@property
def figureOfMerit(self) -> Optional[float]:
"""figureOfMerit of Peak, between 0.0 and 1.0 inclusive."""
return self._wrappedData.figOfMerit
@figureOfMerit.setter
@logCommand(get='self', isProperty=True)
@ccpNmrV3CoreSetter()
def figureOfMerit(self, value: float):
if self._wrappedData.figOfMerit == value:
return
self._wrappedData.figOfMerit = value
# recalculate the shifts
assigned = set(makeIterableList(self.assignments))
shifts = set(cs for nmrAt in assigned for cs in nmrAt.chemicalShifts if cs and not cs.isDeleted)
self._childActions.extend(sh._recalculateShiftValue for sh in shifts)
self._finaliseChildren.extend((sh, 'change') for sh in shifts)
@property
def annotation(self) -> Optional[str]:
"""Peak text annotation."""
return self._wrappedData.annotation
@annotation.setter
@logCommand(get='self', isProperty=True)
def annotation(self, value: Optional[str]):
if not isinstance(value, (str, type(None))):
raise ValueError("annotation must be a string or None")
else:
self._wrappedData.annotation = value
@property
def axisCodes(self) -> Tuple[str, ...]:
"""Spectrum axis codes in dimension order matching position."""
return self.spectrum.axisCodes
@property
def position(self) -> Tuple[float, ...]:
"""Peak position in ppm (or other relevant unit) in dimension order."""
return tuple(x.value for x in self._wrappedData.sortedPeakDims())
@position.setter
@logCommand(get='self', isProperty=True)
@ccpNmrV3CoreSetter()
def position(self, value: Sequence):
# call api changes
shifts = set()
ff = self._project._data2Obj.get
for ii, peakDim in enumerate(self._wrappedData.sortedPeakDims()):
_old = peakDim.position # the current pointPosition, quick to get
peakDim.value = value[ii]
peakDim.realValue = None
# log any peak assignments that have moved in this axis
if peakDim.position != _old:
assigned = set([ff(pdc.resonance) for pdc in peakDim.mainPeakDimContribs if hasattr(pdc, 'resonance')])
shifts |= set(sh for nmrAt in assigned for sh in nmrAt.chemicalShifts)
self._childActions.extend(sh._recalculateShiftValue for sh in shifts)
self._finaliseChildren.extend((sh, 'change') for sh in shifts)
ppmPositions = position
# @property
# def ppmPositions(self) -> Tuple[float, ...]:
# """Peak position in ppm (or other relevant unit) in dimension order."""
# return tuple(x.value for x in self._wrappedData.sortedPeakDims())
#
# @ppmPositions.setter
# @logCommand(get='self', isProperty=True)
# @ccpNmrV3CoreSetter()
# def ppmPositions(self, value: Sequence):
# # call api changes
# for ii, peakDim in enumerate(self._wrappedData.sortedPeakDims()):
# peakDim.value = value[ii]
# peakDim.realValue = None
@property
def positionError(self) -> Tuple[Optional[float], ...]:
"""Peak position error in ppm (or other relevant unit)."""
return tuple(x.valueError for x in self._wrappedData.sortedPeakDims())
@positionError.setter
@logCommand(get='self', isProperty=True)
@ccpNmrV3CoreSetter()
def positionError(self, value: Sequence):
for ii, peakDim in enumerate(self._wrappedData.sortedPeakDims()):
peakDim.valueError = value[ii]
@property
def pointPositions(self) -> Tuple[float, ...]:
"""Peak position in points."""
return tuple(x.position for x in self._wrappedData.sortedPeakDims())
@pointPositions.setter
@logCommand(get='self', isProperty=True)
@ccpNmrV3CoreSetter()
def pointPositions(self, value: Sequence):
shifts = set()
ff = self._project._data2Obj.get
for ii, peakDim in enumerate(self._wrappedData.sortedPeakDims()):
_old = peakDim.position # the current pointPositions
peakDim.position = value[ii]
# log any peak assignments that have moved in this axis
if peakDim.position != _old:
assigned = set([ff(pdc.resonance) for pdc in peakDim.mainPeakDimContribs if hasattr(pdc, 'resonance')])
shifts |= set(sh for nmrAt in assigned for sh in nmrAt.chemicalShifts)
self._childActions.extend(sh._recalculateShiftValue for sh in shifts)
self._finaliseChildren.extend((sh, 'change') for sh in shifts)
@property
def boxWidths(self) -> Tuple[Optional[float], ...]:
"""The full width of the peak footprint in points for each dimension,
i.e. the width of the area that should be considered for integration, fitting, etc."""
return tuple(x.boxWidth for x in self._wrappedData.sortedPeakDims())
@boxWidths.setter
@logCommand(get='self', isProperty=True)
@ccpNmrV3CoreSetter()
def boxWidths(self, value: Sequence):
for ii, peakDim in enumerate(self._wrappedData.sortedPeakDims()):
peakDim.boxWidth = value[ii]
@property
def lineWidths(self) -> Tuple[Optional[float], ...]:
"""Full-width-half-height of peak for each dimension, in Hz/ppm.
"""
return tuple(x.lineWidth for x in self._wrappedData.sortedPeakDims())
@lineWidths.setter
@logCommand(get='self', isProperty=True)
@ccpNmrV3CoreSetter()
def lineWidths(self, value: Sequence):
for ii, peakDim in enumerate(self._wrappedData.sortedPeakDims()):
peakDim.lineWidth = value[ii]
# @property
# def ppmLineWidths(self) -> Tuple[Optional[float], ...]:
# """Full-width-half-height of peak for each dimension, in ppm."""
# return tuple(peakDim.lineWidth * peakDim.dataDim.valuePerPoint if peakDim.lineWidth is not None else None
# for peakDim in self._wrappedData.sortedPeakDims())
#
# @ppmLineWidths.setter
# @logCommand(get='self', isProperty=True)
# @ccpNmrV3CoreSetter()
# def ppmLineWidths(self, value: Sequence):
# for ii, peakDim in enumerate(self._wrappedData.sortedPeakDims()):
# peakDim.lineWidth = value[ii] / peakDim.dataDim.valuePerPoint if value[ii] is not None else None
ppmLineWidths = lineWidths
@property
def pointLineWidths(self) -> Tuple[Optional[float], ...]:
"""Full-width-half-height of peak for each dimension, in points.
"""
# currently assumes that internal storage is in ppm's; GWV thinks Hz????
result = []
for peakDim, valuePerPoint in zip(self._wrappedData.sortedPeakDims(), self.spectrum._valuePerPoints):
val = peakDim.lineWidth / valuePerPoint if peakDim.lineWidth is not None else None
result.append(val)
return tuple(result)
@pointLineWidths.setter
@logCommand(get='self', isProperty=True)
@ccpNmrV3CoreSetter()
def pointLineWidths(self, value: Sequence):
for val, peakDim, valuePerPoint in zip(value,
self._wrappedData.sortedPeakDims(),
self.spectrum._valuePerPoints):
peakDim.lineWidth = val * valuePerPoint if val is not None else None
@property
def aliasing(self) -> Tuple[Optional[float], ...]:
"""Aliasing for the peak in each dimension.
Defined as integer number of spectralWidths added or subtracted along each dimension
"""
aliasing = []
for peakDim in self._wrappedData.sortedPeakDims():
axisReversed = -1
expDimRef = peakDim.dataDim.expDim.findFirstExpDimRef(serial=1)
if expDimRef:
axisReversed = -1 if expDimRef.isAxisReversed else 1
aliasing.append(axisReversed * peakDim.numAliasing)
return tuple(aliasing)
@aliasing.setter
@logCommand(get='self', isProperty=True)
@ccpNmrV3CoreSetter()
def aliasing(self, value: Sequence):
if len(value) != len(self._wrappedData.sortedPeakDims()):
raise ValueError("Length of %s does not match number of dimensions." % str(value))
if not all(isinstance(dimVal, int) for dimVal in value):
raise ValueError("Aliasing values must be integer.")
# call api changes
shifts = set()
ff = self._project._data2Obj.get
for ii, peakDim in enumerate(self._wrappedData.sortedPeakDims()):
# log any peak assignments that have moved in this axis
if peakDim.numAliasing != -1 * value[ii]:
assigned = set([ff(pdc.resonance) for pdc in peakDim.mainPeakDimContribs if hasattr(pdc, 'resonance')])
peakDim.numAliasing = -1 * value[ii]
shifts |= set(sh for nmrAt in assigned for sh in nmrAt.chemicalShifts)
self._childActions.extend(sh._recalculateShiftValue for sh in shifts)
self._finaliseChildren.extend((sh, 'change') for sh in shifts)
@property
def dimensionNmrAtoms(self) -> Tuple[Tuple['NmrAtom', ...], ...]:
"""Peak dimension assignment - a tuple of tuples with the assigned NmrAtoms for each dimension.
One of two alternative views on the Peak assignment.
Example, for a 13C HSQC:
((<NA:A.127.LEU.HA>, <NA:A.127.LEU.HBX>, <NA:A.127.LEU.HBY>, <NA:A.127.LEU.HG>,
(<NA:A.127.LEU.CA>, <NA:A.127.LEU.CB>)
)
Assignments as a list of individual combinations is given in 'assignedNmrAtoms'.
Note that by setting dimensionAssignments you tel the program that all combinations are
possible - in the example that all four protons could be bound to either of the carbons
To (re)set the assignment for a single dimension, use the Peak.assignDimension method."""
result = []
for peakDim in self._wrappedData.sortedPeakDims():
mainPeakDimContribs = peakDim.mainPeakDimContribs
# Done this way as a quick way of sorting the values
mainPeakDimContribs = [x for x in peakDim.sortedPeakDimContribs() if x in mainPeakDimContribs]
data2Obj = self._project._data2Obj
dimResults = [data2Obj[pdc.resonance] for pdc in mainPeakDimContribs
if hasattr(pdc, 'resonance')]
result.append(tuple(sorted(dimResults)))
#
return tuple(result)
@property
def _dimensionNmrAtoms(self) -> Tuple[Tuple['NmrAtom', ...], ...]:
"""Transparent method to control notifiers"""
return self.dimensionNmrAtoms
@_dimensionNmrAtoms.setter
@ccpNmrV3CoreUndoBlock()
def _dimensionNmrAtoms(self, value: Sequence):
"""Assign by Dimensions
Ccpn Internal:used by assignDimension/dimensionNmrAtoms - not to be called elsewhere
Doesn't need undoBlock/CoreSetter as this is taken care of by calling method
"""
if not isinstance(value, Sequence):
raise ValueError("dimensionNmrAtoms must be sequence of list/tuples")
isotopeCodes = self.peakList.spectrum.isotopeCodes
apiPeak = self._wrappedData
dimResonances = []
for ii, atoms in enumerate(value):
if atoms is None:
dimResonances.append(None)
else:
isotopeCode = isotopeCodes[ii]
if isinstance(atoms, str):
raise ValueError("dimensionNmrAtoms cannot be set to a sequence of strings")
if not isinstance(atoms, Sequence):
raise ValueError("dimensionNmrAtoms must be sequence of list/tuples")
atoms = tuple(self.getByPid(x) if isinstance(x, str) else x for x in atoms)
resonances = tuple(x._wrappedData for x in atoms if x is not None)
if isotopeCode and isotopeCode != UnknownIsotopeCode:
# check for isotope match
for x in resonances:
if x.isotopeCode not in (isotopeCode, UnknownIsotopeCode, None):
msg = f"""IsotopeCodes mismatch between NmrAtom {x.name} and Spectrum.
Consider changing NmrAtom isotopeCode from {x.isotopeCode} to {isotopeCode}, None, or {UnknownIsotopeCode}
to avoid future warnings."""
getLogger().warning(msg) # don't raise errors. NmrAtoms are just labels and can be assigned to anything if user wants so.
dimResonances.append(resonances)
apiPeak.assignByDimensions(dimResonances)
def _recalculatePeakShifts(self, nmrResidues, shifts):
# update the assigned nmrAtom chemical shift values - notify the nmrResidues and chemicalShifts
for sh in shifts:
sh._recalculateShiftValue()
for nmr in nmrResidues:
nmr._finaliseAction('change')
for sh in shifts:
sh._finaliseAction('change')
@dimensionNmrAtoms.setter
@logCommand(get='self', isProperty=True)
def dimensionNmrAtoms(self, value: Sequence):
_pre = set(makeIterableList(self.assignedNmrAtoms))
_post = set(makeIterableList(value))
nmrResidues = set(nmr.nmrResidue for nmr in (_pre | _post))
shifts = list(set(cs for nmrAt in (_pre | _post) for cs in nmrAt.chemicalShifts))
newShifts = shifts.copy()
_thisNmrPids = self.spectrum.chemicalShiftList._getNmrAtomPids()
_pre = set(atm.pid for atm in _pre)
_post = set(atm.pid for atm in _post)
with undoBlock():
with undoStackBlocking() as addUndoItem:
addUndoItem(undo=partial(self._recalculatePeakShifts, nmrResidues, shifts))
# set the value
self._dimensionNmrAtoms = value
# add those that are not already in the list - otherwise recalculate
for nmrAtom in (_post - _pre - _thisNmrPids):
newShifts.append(self.spectrum.chemicalShiftList.newChemicalShift(nmrAtom=nmrAtom))
# update the chemicalShift value/valueError
self._recalculatePeakShifts(nmrResidues, newShifts)
with undoStackBlocking() as addUndoItem:
addUndoItem(redo=partial(self._recalculatePeakShifts, nmrResidues, newShifts))
@property
def assignedNmrAtoms(self) -> Tuple[Tuple[Optional['NmrAtom'], ...], ...]:
"""Peak assignment - a tuple of tuples of NmrAtom combinations.
(e.g. a tuple of triplets for a 3D spectrum).
One of two alternative views on the Peak assignment.
Missing assignments are entered as None.
Example, for 13H HSQC::
((<NA:A.127.LEU.HA>, <NA:A.127.LEU.CA>),
(<NA:A.127.LEU.HBX>, <NA:A.127.LEU.CB>),
(<NA:A.127.LEU.HBY>, <NA:A.127.LEU.CB>),
(<NA:A.127.LEU.HG>, None),)
To add a single assignment tuple, use the Peak.addAssignment method
See also dimensionNmrAtoms, which gives assignments per dimension."""
data2Obj = self._project._data2Obj
apiPeak = self._wrappedData
peakDims = apiPeak.sortedPeakDims()
mainPeakDimContribs = [sorted(x.mainPeakDimContribs, key=operator.attrgetter('serial'))
for x in peakDims]
result = []
for peakContrib in apiPeak.sortedPeakContribs():
allAtoms = []
peakDimContribs = peakContrib.peakDimContribs
for ii, peakDim in enumerate(peakDims):
nmrAtoms = [data2Obj.get(x.resonance) for x in mainPeakDimContribs[ii]
if x in peakDimContribs and hasattr(x, 'resonance')]
if not nmrAtoms:
nmrAtoms = [None]
allAtoms.append(nmrAtoms)
# NB this gives a list of tuples
# Remove all-None tuples
result.extend(tt for tt in itertools.product(*allAtoms)
if any(x is not None for x in tt))
# result += itertools.product(*allAtoms)
return tuple(sorted(result))
@property
def _assignedNmrAtoms(self) -> Tuple[Tuple[Optional['NmrAtom'], ...], ...]:
"""Transparent method to control notifiers"""
return self.assignedNmrAtoms
@_assignedNmrAtoms.setter
@ccpNmrV3CoreUndoBlock()
def _assignedNmrAtoms(self, value: Sequence):
"""Assign by Contributions
Ccpn Internal: used by assignedNmrAtoms - not to be called elsewhere
Doesn't need undoBlock/CoreSetter as this is taken care of by calling method
"""
if not isinstance(value, Sequence):
raise ValueError("assignedNmrAtoms must be set to a sequence of list/tuples")
isotopeCodes = tuple(None if x == UnknownIsotopeCode else x for x in self.peakList.spectrum.isotopeCodes)
apiPeak = self._wrappedData
peakDims = apiPeak.sortedPeakDims()
dimensionCount = len(peakDims)
# get resonance, all tuples and per dimension
resonances = []
for tt in value:
ll = dimensionCount * [None]
resonances.append(ll)
for ii, atom in enumerate(tt):
atom = self.getByPid(atom) if isinstance(atom, str) else atom
if isinstance(atom, NmrAtom):
resonance = atom._wrappedData
if isotopeCodes[ii] and resonance.isotopeCode not in (isotopeCodes[ii], UnknownIsotopeCode, None):
raise ValueError("NmrAtom %s, isotope %s, assigned to dimension %s must have isotope %s or %s"
% (atom, resonance.isotopeCode, ii + 1, isotopeCodes[ii], UnknownIsotopeCode))
ll[ii] = resonance
elif atom is not None:
raise TypeError('Error assigning NmrAtom %s to dimension %s' % (str(atom), ii + 1))
# store the currently attached nmrAtoms
_assigned = set(makeIterableList(self.assignedNmrAtoms))
# set assignments
apiPeak.assignByContributions(resonances)
@assignedNmrAtoms.setter
@logCommand(get='self', isProperty=True)
def assignedNmrAtoms(self, value: Sequence):
_pre = set(makeIterableList(self.assignedNmrAtoms))
_post = set(makeIterableList(value))
nmrResidues = set(nmr.nmrResidue for nmr in (_pre | _post))
shifts = list(set(cs for nmrAt in (_pre | _post) for cs in nmrAt.chemicalShifts))
newShifts = shifts.copy()
_thisNmrPids = self.spectrum.chemicalShiftList._getNmrAtomPids()
_pre = set(atm.pid for atm in _pre)
_post = set(atm.pid for atm in _post)
with undoBlock():
with undoStackBlocking() as addUndoItem:
addUndoItem(undo=partial(self._recalculatePeakShifts, nmrResidues, shifts))
# set the value
self._assignedNmrAtoms = value
# add those that are not already in the list - otherwise recalculate
for nmrAtom in (_post - _pre - _thisNmrPids):
newShifts.append(self.spectrum.chemicalShiftList.newChemicalShift(nmrAtom=nmrAtom))
# update the chemicalShift value/valueError
self._recalculatePeakShifts(nmrResidues, newShifts)
with undoStackBlocking() as addUndoItem:
addUndoItem(redo=partial(self._recalculatePeakShifts, nmrResidues, newShifts))
# alternativeNames
assignments = assignedNmrAtoms
assignmentsByDimensions = dimensionNmrAtoms
@property
def multiplets(self) -> Optional[Tuple[Any]]:
"""List of multiplets containing the Peak."""
return tuple([self._project._data2Obj[mt] for mt in self._wrappedData.sortedMultiplets()
if mt in self._project._data2Obj])
[docs] @logCommand(get='self')
def addAssignment(self, value: Sequence[Union[str, 'NmrAtom']]):
"""Add a peak assignment - a list of one NmrAtom or Pid for each dimension"""
if len(value) != self.peakList.spectrum.dimensionCount:
raise ValueError("Length of assignment value %s does not match peak dimensionality %s "
% (value, self.peakList.spectrum.dimensionCount))
# Convert to tuple and check for non-existing pids
ll = []
for val in value:
if isinstance(val, str):
vv = self.getByPid(val)
if vv is None:
raise ValueError("No NmrAtom matching string pid %s" % val)
else:
ll.append(vv)
else:
ll.append(val)
value = tuple(value)
assignedNmrAtoms = list(self.assignedNmrAtoms)
if value in assignedNmrAtoms:
self._project._logger.warning("Attempt to add already existing Peak Assignment: %s - ignored"
% value)
else:
assignedNmrAtoms.append(value)
self.assignedNmrAtoms = assignedNmrAtoms
[docs] @logCommand(get='self')
def assignDimension(self, axisCode: str, value: Union[Union[str, 'NmrAtom'],
Sequence[Union[str, 'NmrAtom']]] = None):
"""Assign dimension with axisCode to value (NmrAtom, or Pid or sequence of either, or None)."""
axisCodes = self.spectrum.axisCodes
try:
axis = axisCodes.index(axisCode)
except ValueError:
raise ValueError("axisCode %s not recognised" % axisCode)
if value is None:
value = []
elif isinstance(value, str):
value = [self.getByPid(value)]
elif isinstance(value, Sequence):
value = [(self.getByPid(x) if isinstance(x, str) else x) for x in value]
else:
value = [value]
dimensionNmrAtoms = list(self.dimensionNmrAtoms)
dimensionNmrAtoms[axis] = value
self.dimensionNmrAtoms = dimensionNmrAtoms
[docs] def getByAxisCodes(self, parameterName: str, axisCodes: Sequence[str] = None,
exactMatch: bool = False) -> list:
"""Return a list of values defined by parameterName in order defined by axisCodes (default order if None).
Perform a mapping if exactMatch=False (eg. 'H' to 'Hn')
:param parameterName: a str denoting a Spectrum dimensional attribute
:param axisCodes: a tuple or list of axisCodes
:param exactMatch: a boolean optional testing for an exact match with the instance axisCodes
:return: the values defined by parameterName in axisCode order
Related:
Use getByDimensions() for dimensions (1..dimensionCount) based access of dimensional parameters of the
Peak class.
"""
from ccpn.core.lib.SpectrumLib import _getParameterValues
if axisCodes is None:
dimensions = self.spectrum.dimensions
else:
dimensions = self.spectrum.orderByAxisCodes(self.spectrum.dimensions, axisCodes=axisCodes, exactMatch=exactMatch)
try:
newValues = _getParameterValues(self, parameterName,
dimensions=dimensions, dimensionCount=self.spectrum.dimensionCount)
except ValueError as es:
raise ValueError('%s.getByAxisCodes: %s' % (self.__class__.__name__, str(es)))
return newValues
[docs] def setByAxisCodes(self, parameterName: str, values: Sequence, axisCodes: Sequence[str] = None, exactMatch: bool = False) -> list:
"""Set attributeName to values in order defined by axisCodes (default order if None)
Perform a mapping if exactMatch=False (eg. 'H' to 'Hn')
:param parameterName: a str denoting a Spectrum dimensional attribute
:param values: an iterable with values
:param axisCodes: a tuple or list of axisCodes
:param exactMatch: a boolean optional testing for an exact match with the instance axisCodes
:return: a list of newly set values of parameterName (in default order)
Related:
Use setByDimensions() for dimensions (1..dimensionCount) based setting of dimensional parameters of the
Peak class.
"""
from ccpn.core.lib.SpectrumLib import _setParameterValues
if axisCodes is None:
dimensions = self.spectrum.dimensions
else:
dimensions = self.spectrum.orderByAxisCodes(self.spectrum.dimensions, axisCodes=axisCodes, exactMatch=exactMatch)
try:
newValues = _setParameterValues(self, parameterName, values,
dimensions=dimensions, dimensionCount=self.spectrum.dimensionCount)
except ValueError as es:
raise ValueError('%s.setByAxisCodes: %s' % (self.__class__.__name__, str(es)))
return newValues
[docs] def getByDimensions(self, parameterName: str, dimensions: Sequence[int] = None) -> list:
"""Return a list of values of Peak dimensional attribute parameterName in order defined
by dimensions (default order if None).
:param parameterName: a str denoting a Spectrum dimensional attribute
:param dimensions: a tuple or list of dimensions (1..dimensionCount)
:return: the values defined by parameterName in dimensions order
Related:
Use getByAxisCodes() for axisCode based access of dimensional parameters of the Spectrum class.
"""
from ccpn.core.lib.SpectrumLib import _getParameterValues
if dimensions is None:
dimensions = self.spectrum.dimensions
try:
newValues = _getParameterValues(self, parameterName,
dimensions=dimensions, dimensionCount=self.spectrum.dimensionCount)
except ValueError as es:
raise ValueError('%s.getByDimensions: %s' % (self.__class__.__name__, str(es)))
return newValues
[docs] def setByDimensions(self, parameterName: str, values: Sequence, dimensions: Sequence[int] = None) -> list:
"""Set Spectrum dimensional attribute parameterName to values in the order as defined by
dimensions (1..dimensionCount)(default order if None)
:param parameterName: a str denoting a Spectrum dimensional attribute
:param dimensions: a tuple or list of dimensions (1..dimensionCount)
:return: a list of newly set values of parameterName (in default order)
Related:
Use setByAxisCodes() for axisCode based setting of dimensional parameters of the Spectrum class.
"""
from ccpn.core.lib.SpectrumLib import _setParameterValues
if dimensions is None:
dimensions = self.spectrum.dimensions
try:
newValues = _setParameterValues(self, parameterName, values, dimensions=dimensions, dimensionCount=self.spectrum.dimensionCount)
except ValueError as es:
raise ValueError('%s.setByDimensions: %s' % (self.__class__.__name__, str(es)))
return newValues
@property
def clusterId(self):
"""Get/set the clusterId for the peak
"""
return self._wrappedData.clusterId
@clusterId.getter
def clusterId(self):
"""Get the clusterId for the peak
"""
if self._wrappedData.clusterId is None:
cid = int(self.pid.fields[-1])
self.clusterId = cid
return self._wrappedData.clusterId
@clusterId.setter
def clusterId(self, value):
if not isinstance(value, (int, type(None))):
raise ValueError('Peak.clusterId must be of type int >= 0, None')
if value is not None and value < 0:
raise ValueError('Peak.clusterId must be >= 0')
self._wrappedData.clusterId = value
#=========================================================================================
# Implementation functions
#=========================================================================================
@classmethod
def _getAllWrappedData(cls, parent: PeakList) -> Tuple[Nmr.Peak, ...]:
"""Get wrappedData (Peaks) for all Peak children of parent PeakList."""
return parent._wrappedData.sortedPeaks()
def _finaliseAction(self, action: str):
"""Subclassed to handle associated multiplets
"""
if not super()._finaliseAction(action):
return
# if this peak is changed or deleted then it's multiplets/integral need to CHANGE
# create required as undo may return peak to a multiplet list
if action in ['change', 'create', 'delete']:
for mt in self.multiplets:
mt._finaliseAction('change')
# NOTE:ED does integral need to be notified? - and reverse notifiers in multiplet/integral
[docs] def delete(self):
"""Delete a peak."""
assigned = tuple(() for _ in range(self.peakList.spectrum.dimensionCount))
with undoBlockWithoutSideBar():
self.dimensionNmrAtoms = assigned
self._delete()
def __str__(self):
"""Readable string representation;
"""
_digits = {'1H': 3, '15N': 2, '13C': 2, '19F': 3}
# _digits.get(iCode,2)
ppms = tuple(round(p, _digits.get(iCode, 2))
for p, iCode in zip(self.ppmPositions, self.spectrum.isotopeCodes))
return "<%s: @%r>" % (self.pid, ppms)
#=========================================================================================
# CCPN functions
#=========================================================================================
[docs] def isPartlyAssigned(self):
"""Whether peak is partly assigned."""
return any(self.dimensionNmrAtoms)
[docs] def isFullyAssigned(self):
"""Whether peak is fully assigned."""
return all(self.dimensionNmrAtoms)
[docs] @logCommand(get='self')
def copyTo(self, targetPeakList: PeakList, includeAllProperties: bool = True) -> 'Peak':
"""Make (and return) a copy of the Peak in targetPeakList.
IncludeAll, True to copy all properties from origin to target Peak. False will copy
only position and assignments (if available)"""
if includeAllProperties:
singleValueTags = ['height', 'volume', 'heightError', 'volumeError', 'figureOfMerit',
'annotation', 'comment', ]
dimensionValueTags = ['ppmPositions', 'positionError', 'boxWidths', 'lineWidths', ]
else:
singleValueTags = []
dimensionValueTags = ['ppmPositions', 'positionError']
peakList = self.peakList
dimensionCount = peakList.spectrum.dimensionCount
if dimensionCount < targetPeakList.spectrum.dimensionCount:
raise ValueError("Cannot copy %sD %s to %sD %s. Incompatible dimensionality."
% (dimensionCount, self.longPid,
targetPeakList.spectrum.dimensionCount, targetPeakList.longPid))
destinationAxisCodes = targetPeakList.spectrum.axisCodes
dimensionMapping = peakList.spectrum.getByAxisCodes('dimensions', destinationAxisCodes, exactMatch=False)
if None in dimensionMapping:
raise ValueError("%s axisCodes %s not compatible with targetSpectrum axisCodes %s"
% (self, peakList.spectrum.axisCodes, targetPeakList.spectrum.axisCodes))
with undoBlockWithoutSideBar():
params = dict((tag, getattr(self, tag)) for tag in singleValueTags)
for tag in dimensionValueTags:
value = self.getByDimensions(tag, dimensions=dimensionMapping)
params[tag] = value
newPeak = targetPeakList.newPeak(**params)
assignments = self.getByDimensions('assignedNmrAtoms', dimensionMapping)
if assignments:
newPeak.assignedNmrAtoms = assignments
return newPeak
[docs] def reorderValues(self, values, newAxisCodeOrder):
"""Reorder values in spectrum dimension order to newAxisCodeOrder
by matching newAxisCodeOrder to spectrum axis code order."""
return commonUtil.reorder(values, self._parent._parent.axisCodes, newAxisCodeOrder)
[docs] def getInAxisOrder(self, attributeName: str, axisCodes: Sequence[str] = None):
"""Get attributeName in order defined by axisCodes :
(default order if None)"""
if not hasattr(self, attributeName):
raise AttributeError('Peak object does not have attribute "%s"' % attributeName)
values = getattr(self, attributeName)
if axisCodes is None:
return values
else:
# change to order defined by axisCodes
return self.reorderValues(values, axisCodes)
[docs] def setInAxisOrder(self, attributeName: str, values: Sequence[Any], axisCodes: Sequence[str] = None):
"""Set attributeName from values in order defined by axisCodes
(default order if None)"""
if not hasattr(self, attributeName):
raise AttributeError('Peak object does not have attribute "%s"' % attributeName)
if axisCodes is not None:
# change values to the order appropriate for spectrum
values = self.reorderValues(values, axisCodes)
setattr(self, attributeName, values)
[docs] def snapToExtremum(self, halfBoxSearchWidth: int = 4, halfBoxFitWidth: int = 4,
minDropFactor: float = 0.1, fitMethod: str = PARABOLICMETHOD,
searchBoxMode=False, searchBoxDoFit=False):
"""Snap the Peak to the closest local extrema, if within range."""
peakUtilsSnapToExtremum(self, halfBoxSearchWidth=halfBoxSearchWidth, halfBoxFitWidth=halfBoxFitWidth,
minDropFactor=minDropFactor, fitMethod=fitMethod,
searchBoxMode=searchBoxMode, searchBoxDoFit=searchBoxDoFit)
# def fitPositionHeightLineWidths(self):
# """Set the position, height and lineWidth of the Peak."""
# LibPeak.fitPositionHeightLineWidths(self._apiPeak)
@property
def integral(self):
"""Return the integral attached to the peak."""
return self._project._data2Obj[self._wrappedData.integral] if self._wrappedData.integral else None
@integral.setter
@logCommand(get='self', isProperty=True)
@ccpNmrV3CoreSetter()
def integral(self, value: Union['Integral'] = None):
"""Link an integral to the peak.
The peak must belong to the spectrum containing the peakList.
:param integral: single integral."""
spectrum = self._parent.spectrum
if value:
from ccpn.core.Integral import Integral
if not isinstance(value, Integral):
raise TypeError('%s is not of type Integral' % value)
if value not in spectrum.integrals:
raise ValueError('%s does not belong to spectrum: %s' % (value.pid, spectrum.pid))
self._wrappedData.integral = value._wrappedData if value else None
# def _linkPeaks(self, peaks):
# """
# NB: this is needed for screening spectrumHits and peakHits. You might see peakCluster instead.
# Saves the peaks in _ccpnInternalData as pids
# """
# pids = [str(peak.pid) for peak in peaks if peak != self and isinstance(peak, Peak)]
# if isinstance(self._ccpnInternalData, dict):
#
# # a single write is required to the api to notify that a change has occurred,
# # this will prompt for a save of the v2 data
# tempCcpn = self._ccpnInternalData.copy()
# tempCcpn[self._linkedPeaksName] = pids
# self._ccpnInternalData = tempCcpn
# else:
# raise ValueError("Peak.linkPeaks: CCPN internal must be a dictionary")
#
# @property
# def _linkedPeaks(self):
# """
# NB: this is needed for screening spectrumHits and peakHits. You might see peakCluster instead.
# It returns a list of peaks belonging to other peakLists or spectra which are required to be linked to this particular peak.
# This functionality is not implemented in the model. Saves the Peak pids in _ccpnInternalData.
# :return: a list of peaks
# """
# pids = self._ccpnInternalData.get(self._linkedPeaksName) or []
# peaks = [self.project.getByPid(pid) for pid in pids if pid is not None]
# return peaks
@property
def signalToNoiseRatio(self):
"""
:return: float. Estimated Signal to Noise ratio based on the spectrum noiseLevel values.
SNratio = |factor*(height/DeltaNoise)|
height: peak height
DeltaNoise: spectrum noise levels
factor: multiplication factor. Default: 2.5
"""
return _getPeakSNRatio(self)
[docs] @logCommand(get='self')
def estimateVolume(self, volumeIntegralLimit=2.0):
"""Estimate the volume of the peak from a gaussian distribution.
The width of the volume integral in each dimension is the lineWidth (FWHM) * volumeIntegralLimit,
the default is 2.0 * FWHM of the peak.
:param volumeIntegralLimit: integral width as a multiple of lineWidth (FWHM)
"""
def sigma2fwhm(sigma):
"""Convert sigma to FWHM for gaussian distribution
"""
return sigma * np.sqrt(8 * np.log(2))
def fwhm2sigma(fwhm):
"""Convert FWHM to sigma for gaussian distribution
"""
return fwhm / np.sqrt(8 * np.log(2))
def make_gauss(N, sigma, mu, height):
"""Generate a gaussian distribution from given parameters
"""
k = height # 1.0 / (sigma * np.sqrt(2 * np.pi)) - to give unit area at infinite bounds
s = -1.0 / (2 * sigma * sigma)
return k * np.exp(s * (N - mu) * (N - mu))
lineWidths = self.lineWidths
if not lineWidths or None in lineWidths:
raise ValueError('cannot estimate volume, lineWidths not defined or contain None.')
if not self.height:
raise ValueError('cannot estimate volume, height not defined.')
# parameters for a unit height/sigma gaussian
sigmaX = 1.0
mu = 0.0
height = 1.0
numPoints = 39 # area estimate area < 1e-8 for this number of points
# calculate integral limit from FWHM - only need positive half
FWHM = sigma2fwhm(sigmaX)
lim = volumeIntegralLimit * FWHM / 2.0
xxSig = np.linspace(0, lim, numPoints)
vals = make_gauss(xxSig, sigmaX, mu, height)
area = 2.0 * np.trapz(vals, xxSig)
# note that negative height will give negative volume
vol = 1.0
for lw in lineWidths:
# multiply the values for the gaussian in each dimension
vol *= (area * (lw / FWHM))
self.volume = self.height * abs(vol)
# do I need to set the volume error?
# self.volumeError = 1e-8
[docs] def fit(self, fitMethod=None, halfBoxSearchWidth=2, keepPosition=False, iterations=10):
"""
Fit the peak to recalculate position and lineWidths.
Use peak.estimateVolume to recalculate the volume.
:param fitMethod: str, one of ['gaussian', 'lorentzian', 'parabolic']
Default: the fitting method defined in the general preferences.
If not given or not included in the available options, it uses the default.
:param halfBoxSearchWidth: int. Default: 2.
Used to increase the searching area limits from the initial position.
:param keepPosition: bool. Default: False.
if True, reset to the original position after applying the fitting method.
Height is calculated using spectrum.getHeight()
:param iterations: int. Default: 3.
How many times the fitting method will run before it converges.
:return: None.
"""
from ccpn.core.PeakList import PICKINGMETHODS
from ccpn.core.lib.ContextManagers import undoBlockWithoutSideBar, notificationEchoBlocking
if not fitMethod in PICKINGMETHODS:
fitMethod = self._project.application.preferences.general.peakFittingMethod
peak = self
peakList = peak.peakList
originalPosition = peak.position
lastLWsFound = []
consecutiveSameLWsCount = 0
maxSameLWsCount = 3 # if the same values are found in the last x iterations, then it breaks the loop.
with undoBlockWithoutSideBar():
with notificationEchoBlocking():
while iterations > 0 and consecutiveSameLWsCount <= maxSameLWsCount:
peakList.fitExistingPeaks([peak], fitMethod=fitMethod,
halfBoxSearchWidth=halfBoxSearchWidth, singularMode=True)
if keepPosition:
peak.position = originalPosition
peak.height = peakList.spectrum.getHeight(peak.ppmPositions)
if np.array_equal(lastLWsFound, peak.lineWidths):
consecutiveSameLWsCount += 1
else:
consecutiveSameLWsCount = 0
lastLWsFound = peak.lineWidths
iterations -= 1
getLogger().info('Peak fit completed for %s' % peak)
return
# def _checkAliasing(self):
# """Recalculate the aliasing range for all peaks in the parent spectrum
# """
# spectrum = self.peakList.spectrum
# alias = spectrum._getAliasingRange()
# if alias is not None:
# spectrum.aliasingRange = alias
#===========================================================================================
# new'Object' and other methods
# Call appropriate routines in their respective locations
#===========================================================================================
#=========================================================================================
# Connections to parents:
#=========================================================================================
@newObject(Peak)
def _newPeak(self: PeakList, height: float = None, volume: float = None,
heightError: float = None, volumeError: float = None,
figureOfMerit: float = 1.0, annotation: str = None, comment: str = None,
ppmPositions: Sequence[float] = (), position: Sequence[float] = None, positionError: Sequence[float] = (),
pointPositions: Sequence[float] = (), boxWidths: Sequence[float] = (),
lineWidths: Sequence[float] = (), ppmLineWidths: Sequence[float] = (), pointLineWidths: Sequence[float] = (),
) -> Peak:
"""Create a new Peak within a peakList
NB you must create the peak before you can assign it. The assignment attributes are:
- assignedNmrAtoms - A tuple of all (e.g.) assignment triplets for a 3D spectrum
- dimensionNmrAtoms - A tuple of tuples of assignments, one for each dimension
See the Peak class for details.
:param height: height of the peak (related attributes: volume, volumeError, lineWidths)
:param volume:
:param heightError:
:param volumeError:
:param figureOfMerit:
:param annotation:
:param comment: optional comment string
:param ppmPositions: peak position in ppm for each dimension (related attributes: positionError, pointPositions)
:param position: OLD: peak position in ppm for each dimension (related attributes: positionError, pointPositions)
:param positionError:
:param pointPositions:
:param boxWidths:
:param lineWidths:
:return: a new Peak instance.
"""
if position is not None:
ppmPositions = position # Backward compatibility
apiPeakList = self._apiPeakList
apiPeak = apiPeakList.newPeak(height=height, volume=volume,
heightError=heightError, volumeError=volumeError,
figOfMerit=figureOfMerit, annotation=annotation, details=comment)
result = self._project._data2Obj.get(apiPeak)
if result is None:
raise RuntimeError('Unable to generate new Peak item')
apiPeakDims = apiPeak.sortedPeakDims()
if ppmPositions:
for ii, peakDim in enumerate(apiPeakDims):
peakDim.value = ppmPositions[ii]
elif pointPositions:
pointCounts = result.spectrum.pointCounts
for ii, peakDim in enumerate(apiPeakDims):
# move the peak to the correct aliased position
alias = int((pointPositions[ii] - 1) // pointCounts[ii])
pos = float((pointPositions[ii] - 1) % pointCounts[ii]) + 1.0 # API position starts at 1
peakDim.numAliasing = alias
peakDim.position = pos
if positionError:
for ii, peakDim in enumerate(apiPeakDims):
peakDim.valueError = positionError[ii]
if boxWidths:
for ii, peakDim in enumerate(apiPeakDims):
peakDim.boxWidth = boxWidths[ii]
# currently lineWidths/ppmLineWidths are both in Hz/ppm
if lineWidths:
for ii, peakDim in enumerate(apiPeakDims):
peakDim.lineWidth = lineWidths[ii]
elif ppmLineWidths:
for ii, peakDim in enumerate(apiPeakDims):
peakDim.lineWidth = ppmLineWidths[ii]
elif pointLineWidths:
for peakDim, pointLineWidth in zip(apiPeakDims, pointLineWidths):
peakDim.lineWidth = (pointLineWidth * peakDim.dataDim.valuePerPoint) if pointLineWidth else None
result.height = height # use the method to store the unit-scaled value
result.volume = volume
result.heightError = heightError
result.volumeError = volumeError
return result
@newObject(Peak)
def _newPickedPeak(self: PeakList, pointPositions: Sequence[float] = None, height: float = None,
lineWidths: Sequence[float] = (), fitMethod: str = 'gaussian') -> Peak:
"""Create a new Peak within a peakList from a picked peak
See the Peak class for details.
:param height: height of the peak (related attributes: volume, volumeError, lineWidths)
:param pointPositions: peak position in points for each dimension (related attributes: positionError, pointPositions)
:param fitMethod: type of curve fitting
:param lineWidths:
:return: a new Peak instance.
"""
apiPeakList = self._apiPeakList
apiPeak = apiPeakList.newPeak()
result = self._project._data2Obj.get(apiPeak)
if result is None:
raise RuntimeError('Unable to generate new Peak item')
apiDataSource = self.spectrum._apiDataSource
apiDataDims = apiDataSource.sortedDataDims()
apiPeakDims = apiPeak.sortedPeakDims()
for i, peakDim in enumerate(apiPeakDims):
dataDim = apiDataDims[i]
if dataDim.className == 'FreqDataDim':
dataDimRef = dataDim.primaryDataDimRef
else:
dataDimRef = None
if dataDimRef:
peakDim.numAliasing = int(divmod(pointPositions[i], dataDim.numPointsOrig)[0])
peakDim.position = float(pointPositions[i] + 1 - peakDim.numAliasing * dataDim.numPointsOrig) # API position starts at 1
else:
peakDim.position = float(pointPositions[i] + 1)
if fitMethod and lineWidths and lineWidths[i] is not None:
peakDim.lineWidth = dataDim.valuePerPoint * lineWidths[i] # conversion from points to Hz
# apiPeak.height = apiDataSource.scale * height
# store the unit scaled value
apiPeak.height = height
return result
# Additional Notifiers:
#
# NB These API notifiers will be called for API peaks - which match both Peaks and Integrals
className = Nmr.PeakDim._metaclass.qualifiedName()
Project._apiNotifiers.append(
('_notifyRelatedApiObject', {'pathToObject': 'peak', 'action': 'change'}, className, ''),
)
for clazz in Nmr.AbstractPeakDimContrib._metaclass.getNonAbstractSubtypes():
className = clazz.qualifiedName()
# NB - relies on PeakDimContrib.peakDim.peak still working for deleted peak. Should work.
Project._apiNotifiers.extend((
('_notifyRelatedApiObject', {'pathToObject': 'peakDim.peak', 'action': 'change'},
className, 'postInit'),
('_notifyRelatedApiObject', {'pathToObject': 'peakDim.peak', 'action': 'change'},
className, 'delete'),
)
)
# EJB 20181122: moved to SpectrumReference
# Notify Peaks change when SpectrumReference changes
# (That means DataDimRef referencing information)
# SpectrumReference._setupCoreNotifier('change', AbstractWrapperObject._finaliseRelatedObject,
# {'pathToObject': 'spectrum.peaks', 'action': 'change'})