Source code for ccpn.core.Spectrum

"""Spectrum  class.
Maintains the parameters of a spectrum, including per-dimension/axis values.
Values that are not defined for a given dimension (e.g. sampled dimensions) are given as None.

Dimension identifiers run from 1 to the number of dimensions, i.e. dimensionCount,  (e.g. 1,2,3 for a 3D).
CCPN data the preferred convention is to have the acquisition dimension as dimension 1.

Axes identifiers run from 0 to the dimensionCount-1 (e.g. 0,1,2 for a 3D)
Per-axis values are given in the order data are stored in the spectrum file

The axisCodes are used as an alternative axis identifier. These are unique strings that typically
reflect the isotope on the relevant axis.
By default upon loading a new spectrum, the axisCodes are derived from the isotopeCodes that define
the experimental data for each dimension.
They can match the dimension identifiers in the reference experiment templates, linking a dimension
to the correct reference experiment dimension.
They are also used to automatically map spectrum display-axes between different spectra on a first
character basis.
Axes that are linked by a one-bond magnetisation transfer could be given a lower-case suffix to
show the nucleus bound to. Duplicate axis names should be distinguished by a numerical suffix.
The rules are best illustrated by example:

Experiment                          axisCodes

15N-NOESY-HSQC OR 15N-HSQC-NOESY:   Hn, Nh, H
4D HCCH-TOCSY                       Hc, Ch, Hc1, Ch1
HNCA/CB                             H, N, C
HNCO                                H, N, CO
HCACO                               H, CA, CO
3D proton NOESY-TOCSY               H, H1, H2

1D Bromine NMR                      Br
19F-13C-HSQC                        Fc, Cf

Useful reordering methods exist to get dimensional/axis parameters in a particular order, i.e.:
getByDimension(), setByDimension(), getByAxisCode(), setByAxisCode()
See doc strings of these methods for detailed documentation

"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See https://ccpn.ac.uk/software/licensing/")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Luca Mureddu $"
__dateModified__ = "$dateModified: 2022-04-04 14:35:52 +0100 (Mon, April 04, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2017-04-07 10:28:41 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================

from typing import Sequence, Tuple, Optional, Union, List
from functools import partial
from itertools import permutations
import numpy

from ccpnmodel.ccpncore.api.ccp.nmr import Nmr

from ccpn.core._implementation.AbstractWrapperObject import AbstractWrapperObject
from ccpn.core._implementation.SpectrumTraits import SpectrumTraits
from ccpn.core._implementation.SpectrumData import SliceData, PlaneData, RegionData

from ccpn.core.Project import Project
from ccpn.core.lib import Pid

import ccpn.core.lib.SpectrumLib as specLib
from ccpn.core.lib.SpectrumLib import MagnetisationTransferTuple, _getProjection, getDefaultSpectrumColours
from ccpn.core.lib.SpectrumLib import _includeInDimensionalCopy, _includeInCopy, _includeInCopyList, \
    checkSpectrumPropertyValue, _setDefaultAxisOrdering

from ccpn.core.lib.ContextManagers import \
    newObject, deleteObject, ccpNmrV3CoreSimple, \
    undoStackBlocking, renameObject, undoBlock, notificationBlanking, \
    ccpNmrV3CoreSetter, inactivity, undoBlockWithoutSideBar

from ccpn.core.lib.DataStore import DataStore, DataStoreTrait

from ccpn.core.lib.SpectrumDataSources.SpectrumDataSourceABC import \
    getDataFormats, getSpectrumDataSource, checkPathForSpectrumFormats, DataSourceTrait
from ccpn.core.lib.SpectrumDataSources.EmptySpectrumDataSource import EmptySpectrumDataSource
from ccpn.core.lib.SpectrumDataSources.Hdf5SpectrumDataSource import Hdf5SpectrumDataSource

from ccpn.core.lib.Cache import cached

from ccpn.core.lib.PeakPickers.PeakPickerABC import PeakPickerTrait

from ccpn.util.traits.CcpNmrJson import CcpNmrJson, jsonHandler
from ccpn.util.traits.CcpNmrTraits import Int, Float, Instance, Any
from ccpn.core.lib.SpectrumLib import SpectrumDimensionTrait

from ccpn.framework.PathsAndUrls import CCPN_STATE_DIRECTORY, CCPN_SPECTRA_DIRECTORY
from ccpn.framework.Application import getApplication

from ccpn.util.Constants import SCALETOLERANCE
from ccpn.util.Common import isIterable, _getObjectsByPids
from ccpn.core.lib.AxisCodeLib import getAxisCodeMatch
from ccpn.util.Logging import getLogger
from ccpn.util.decorators import logCommand, singleton
from ccpn.util.Path import Path, aPath

# defined here too as imported from Spectrum throughout the code base
MAXALIASINGRANGE = specLib.MAXALIASINGRANGE

#=========================================================================================
# Spectrum class
#=========================================================================================

from ccpn.core._implementation.updates.update_3_0_4 import _updateSpectrum_3_0_4_to_3_1_0
from ccpn.core._implementation.Updater import updateObject, UPDATE_POST_PROJECT_INITIALISATION

[docs]@updateObject(fromVersion = '3.0.4', toVersion = '3.1.0', updateFunction =_updateSpectrum_3_0_4_to_3_1_0, updateMethod = UPDATE_POST_PROJECT_INITIALISATION ) class Spectrum(AbstractWrapperObject): """A Spectrum object contains all the stored properties of an NMR spectrum, as well as the path to the NMR (binary) data file. The Spectrum object has methods to get the binary data as SpectrumData (i.e. numpy.ndarray) objects. """ #----------------------------------------------------------------------------------------- #: Short class name, for PID. shortClassName = 'SP' # Attribute it necessary as subclasses must use superclass className className = 'Spectrum' _parentClass = Project #: Name of plural link to instances of class _pluralLinkName = 'spectra' #: List of child classes. _childClasses = [] # Qualified name of matching API class _apiClassQualifiedName = Nmr.DataSource._metaclass.qualifiedName() #----------------------------------------------------------------------------------------- # 'local' definition of MAXDIM; defining defs in SpectrumLib to prevent circular imports MAXDIM = specLib.MAXDIM # 8 # Maximum dimensionality #----------------------------------------------------------------------------------------- # Internal NameSpace and other definitions #----------------------------------------------------------------------------------------- # Key for storing the dataStore info in the Ccpn internal parameter store _DATASTORE_KEY = '_dataStore' _REFERENCESUBSTANCESCACHE = '_referenceSubstances' _AdditionalAttribute = 'AdditionalAttribute' _ReferenceSubstancesPids = '_ReferenceSubstancesPids' _REFERENCESUBSTANCES = 'referenceSubstances' _INCLUDEPOSITIVECONTOURS = 'includePositiveContours' _INCLUDENEGATIVECONTOURS = 'includeNegativeContours' _PREFERREDAXISORDERING = '_preferredAxisOrdering' _SERIESITEMS = '_seriesItems' _DISPLAYFOLDEDCONTOURS = 'displayFoldedContours' _NEGATIVENOISELEVEL = 'negativeNoiseLevel' #----------------------------------------------------------------------------------------- # Attributes of the data structure (incomplete?) #----------------------------------------------------------------------------------------- @property def peakLists(self) -> list: """STUB: hot-fixed later""" return [] @property def multipletLists(self) -> list: """STUB: hot-fixed later""" return [] @property def integralLists(self) -> list: """STUB: hot-fixed later""" return [] @property def spectrumViews(self) -> tuple: """Return a tuple of SpectrumView instances associated with this spectrum""" specViews = [self._project._data2Obj.get(y) for x in self._wrappedData.sortedSpectrumViews() for y in x.sortedStripSpectrumViews()] return tuple(specViews) @property def chemicalShiftList(self): """STUB: hot-fixed later""" return None @property def spectrumReferences(self) -> list: """list of spectrumReferences objects STUB: hot-fixed later """ return [] @property def spectrumDimensions(self) -> tuple: """:return A tuple with the spectrum dimensions (== SpectrumReference or PseudoDimension) instances """ if self._spectrumDimensions is None: data2obj = self.project._data2Obj dataDims = [] for dim in self._wrappedData.sortedDataDims(): if dim.className == 'FreqDataDim': dataDims.append(list(dim.dataDimRefs)[0]) else: dataDims.append(dim) sDims = [data2obj.get(dim) for dim in dataDims] self._spectrumDimensions = tuple(sDims) return tuple(self._spectrumDimensions) @property def spectrumHits(self) -> list: """STUB: hot-fixed later""" return [] @property def sample(self): """STUB: hot-fixed later""" return None # Inherited from AbstractWrapperObject # @property # def project(self) -> 'Project': # """The Project (root)containing the object.""" # return self._project @property def _parent(self) -> Project: """Parent (containing) object.""" return self._project #----------------------------------------------------------------------------------------- def __init__(self, project: Project, wrappedData: Nmr.DataSource): # super().__init__(project, wrappedData) # CcpNmrJson.__init__(self) AbstractWrapperObject.__init__(self, project, wrappedData) self._spectrumTraits = SpectrumTraits(spectrum=self) # 1D data references self._intensities = None self._positions = None self._spectrumDimensions = None # A tuple of SpectrumReferences instances; set once and retained for speed self.doubleCrosshairOffsets = self.dimensionCount * [0] # TBD: do we need this to be a property? self.showDoubleCrosshair = False self._scaleChanged = False #----------------------------------------------------------------------------------------- # end __init__ #----------------------------------------------------------------------------------------- # References to DataStore / DataSource instances for filePath manipulation and (binary) # data reading; @property def _dataStore(self): """A DataStore instance encoding the filePath and dataFormat of the (binary) spectrum data. None indicates no spectrum filePile path has been defined """ return self._spectrumTraits.dataStore @property def dataSource(self): """A SpectrumDataSource instance for reading (writing) of the (binary) spectrum data. None indicates no valid spectrum data file has been defined """ return self._spectrumTraits.dataSource @property def peakPicker(self): """A PeakPicker instance for region picking in this spectrum. None indicates no valid PeakPicker has been defined """ # GWV: this should not happen; a default is set on restore or newSpectrum # if self._spectrumTraits.peakPicker is None: # if (_peakPicker := self._getPeakPicker()) is not None: # self.peakPicker = _peakPicker return self._spectrumTraits.peakPicker @peakPicker.setter def peakPicker(self, peakPicker): """Set the current peakPicker or deassign when None """ if peakPicker and peakPicker.spectrum != self: raise ValueError(f'Invalid peakPicker: already linked to {peakPicker.spectrum}') if peakPicker: with undoBlockWithoutSideBar(): # set the current peakPicker self._spectrumTraits.peakPicker = peakPicker # automatically store in the spectrum CCPN internal store peakPicker._storeAttributes() getLogger().debug('Setting peakPicker to %s' % peakPicker) else: with undoBlockWithoutSideBar(): # clear the current peakPicker if self._spectrumTraits.peakPicker is not None: self._spectrumTraits.peakPicker._detachFromSpectrum() self._spectrumTraits.peakPicker = None getLogger().debug('Clearing current peakPicker') #----------------------------------------------------------------------------------------- # Spectrum properties #----------------------------------------------------------------------------------------- @property def name(self) -> str: """short form of name, used for id""" return self._wrappedData.name @name.setter def name(self, value: str): """set name of Spectrum.""" self.rename(value) @property def dimensionCount(self) -> int: """Number of dimensions in spectrum""" return self._wrappedData.numDim @property def dimensions(self) -> tuple: """Convenience: tuple (len dimensionCount) with the dimension integers (1-based); e.g. (1,2,3,..). Useful for mapping in axisCodes order: eg: self.getByAxisCodes('dimensions', ['N','C','H']) """ return tuple(range(1, self.dimensionCount+1)) @property def dimensionIndices(self) -> tuple: """Convenience: tuple (len dimensionCount) with the dimension indices (0-based); e.g. (0,1,2,3). Useful for mapping in axisCodes order: eg: self.getByAxisCodes('dimensionIndices', ['N','C','H']) """ return tuple(range(0, self.dimensionCount)) # legacy axisIndices = dimensionIndices @property def dimensionTriples(self) -> tuple: """Convenience: return a tuple of triples (dimensionIndex, axisCode, dimension) for each dimension Useful for iterating over axis codes; eg in an H-N-CO ordered spectrum for dimIndex, axisCode, dimension in self.getByAxisCodes('dimensionTriples', ('N','C','H'), exactMatch=False) would yield: (1, 'N', 2) (2, 'CO', 3) (0, 'H', 1) """ return tuple(z for z in zip(self.dimensionIndices, self.axisCodes, self.dimensions)) @property @_includeInCopy def positiveContourCount(self) -> int: """number of positive contours to draw""" return self._wrappedData.positiveContourCount @positiveContourCount.setter @logCommand(get='self', isProperty=True) def positiveContourCount(self, value): self._wrappedData.positiveContourCount = value @property @_includeInCopy def positiveContourBase(self) -> float: """base level of positive contours""" return self._wrappedData.positiveContourBase @positiveContourBase.setter @logCommand(get='self', isProperty=True) def positiveContourBase(self, value): self._wrappedData.positiveContourBase = value @property @_includeInCopy def positiveContourFactor(self) -> float: """level multiplier for positive contours""" return self._wrappedData.positiveContourFactor @positiveContourFactor.setter @logCommand(get='self', isProperty=True) def positiveContourFactor(self, value): self._wrappedData.positiveContourFactor = value @property @_includeInCopy def positiveContourColour(self) -> str: """colour of positive contours""" return self._wrappedData.positiveContourColour @positiveContourColour.setter @logCommand(get='self', isProperty=True) def positiveContourColour(self, value): self._wrappedData.positiveContourColour = value @property @_includeInCopy def includePositiveContours(self): """Include flag for the positive contours """ result = self._getInternalParameter(self._INCLUDEPOSITIVECONTOURS) if result is None: # default to True return True return result @includePositiveContours.setter @logCommand(get='self', isProperty=True) def includePositiveContours(self, value: bool): """Include flag for the positive contours """ if not isinstance(value, bool): raise ValueError("Spectrum.includePositiveContours: must be True/False") self._setInternalParameter(self._INCLUDEPOSITIVECONTOURS, value) @property @_includeInCopy def negativeContourCount(self) -> int: """number of negative contours to draw""" return self._wrappedData.negativeContourCount @negativeContourCount.setter @logCommand(get='self', isProperty=True) def negativeContourCount(self, value): self._wrappedData.negativeContourCount = value @property @_includeInCopy def negativeContourBase(self) -> float: """base level of negative contours""" return self._wrappedData.negativeContourBase @negativeContourBase.setter @logCommand(get='self', isProperty=True) def negativeContourBase(self, value): self._wrappedData.negativeContourBase = value @property @_includeInCopy def negativeContourFactor(self) -> float: """level multiplier for negative contours""" return self._wrappedData.negativeContourFactor @negativeContourFactor.setter @logCommand(get='self', isProperty=True) def negativeContourFactor(self, value): self._wrappedData.negativeContourFactor = value @property @_includeInCopy def negativeContourColour(self) -> str: """colour of negative contours""" return self._wrappedData.negativeContourColour @negativeContourColour.setter @logCommand(get='self', isProperty=True) def negativeContourColour(self, value): self._wrappedData.negativeContourColour = value @property @_includeInCopy def includeNegativeContours(self): """Include flag for the negative contours """ result = self._getInternalParameter(self._INCLUDENEGATIVECONTOURS) if result is None: # default to True return True return result @includeNegativeContours.setter @logCommand(get='self', isProperty=True) def includeNegativeContours(self, value: bool): """Include flag for the negative contours """ if not isinstance(value, bool): raise ValueError("Spectrum.includeNegativeContours: must be True/False") self._setInternalParameter(self._INCLUDENEGATIVECONTOURS, value) @property @_includeInCopy def sliceColour(self) -> str: """colour of 1D slices """ return self._wrappedData.sliceColour @sliceColour.setter @logCommand(get='self', isProperty=True) def sliceColour(self, value): self._wrappedData.sliceColour = value # for spectrumView in self.spectrumViews: # spectrumView.setSliceColour() # ejb - update colour here @property @_includeInCopy def scale(self) -> float: """Scaling factor for data in the spectrum. """ value = self._wrappedData.scale if value is None: getLogger().warning('Scaling {} changed from None to 1.0'.format(self)) value = 1.0 if -SCALETOLERANCE < value < SCALETOLERANCE: getLogger().warning('Scaling {} by minimum tolerance (±{})'.format(self, SCALETOLERANCE)) return value @scale.setter @logCommand(get='self', isProperty=True) @ccpNmrV3CoreSimple() def scale(self, value: Union[float, int, None]): if value is None: getLogger().warning('Scaling {} changed from None to 1.0'.format(self)) value = 1.0 if not isinstance(value, (float, int)): raise TypeError('Spectrum.scale {} must be a float or integer'.format(self)) if value is not None and -SCALETOLERANCE < value < SCALETOLERANCE: # Display a warning, but allow to be set getLogger().warning('Scaling {} by minimum tolerance (±{})'.format(self, SCALETOLERANCE)) self._scaleChanged = True if value is None: self._wrappedData.scale = None else: self._wrappedData.scale = float(value) if self.dimensionCount == 1 and self._intensities is not None: # some 1D data were read before; update the intensities as the scale has changed self._intensities = self.getSliceData() @property @_includeInCopy def spinningRate(self) -> float: """NMR tube spinning rate (in Hz) """ return self._wrappedData.experiment.spinningRate @spinningRate.setter @logCommand(get='self', isProperty=True) def spinningRate(self, value: float): self._wrappedData.experiment.spinningRate = value @property @_includeInCopy def noiseLevel(self) -> (float, None): """Noise level for the spectrum """ noise = self._wrappedData.noiseLevel if noise is None: getLogger().debug2('Noise Level is None. Estimated') self.noiseLevel = noise = self.estimateNoise() return noise @noiseLevel.setter @logCommand(get='self', isProperty=True) def noiseLevel(self, value: float): val = float(value) if value is not None else None self._wrappedData.noiseLevel = val @property @_includeInCopy def negativeNoiseLevel(self) -> float: """Negative noise level value. Stored in Internal""" value = self._getInternalParameter(self._NEGATIVENOISELEVEL) if value is None: getLogger().debug2('Returning negativeNoiseLevel=None') return value @negativeNoiseLevel.setter @logCommand(get='self', isProperty=True) def negativeNoiseLevel(self, value): """Stored in Internal """ self._setInternalParameter(self._NEGATIVENOISELEVEL, value) @property def synonym(self) -> Optional[str]: """Systematic experiment type descriptor (CCPN system).""" refExperiment = self._wrappedData.experiment.refExperiment if refExperiment is None: return None else: return refExperiment.synonym @property @_includeInCopy def experimentType(self) -> Optional[str]: """Systematic experiment type descriptor (CCPN system).""" refExperiment = self._wrappedData.experiment.refExperiment if refExperiment is None: return None else: return refExperiment.name @experimentType.setter @logCommand(get='self', isProperty=True) def experimentType(self, value: str): from ccpn.core.lib.SpectrumLib import _setApiExpTransfers, _setApiRefExperiment, _clearLinkToRefExp if value is None: self._wrappedData.experiment.refExperiment = None self.experimentName = None _clearLinkToRefExp(self._wrappedData.experiment) return # nmrExpPrototype = self._wrappedData.root.findFirstNmrExpPrototype(name=value) # Why not findFirst instead of looping all sortedNmrExpPrototypes for nmrExpPrototype in self._wrappedData.root.sortedNmrExpPrototypes(): for refExperiment in nmrExpPrototype.sortedRefExperiments(): if refExperiment.name == value: # set API RefExperiment and ExpTransfer _setApiRefExperiment(self._wrappedData.experiment, refExperiment) _setApiExpTransfers(self._wrappedData.experiment) synonym = refExperiment.synonym if synonym: self.experimentName = synonym return # nothing found - error: raise ValueError('No reference experiment matches name "%s"' % value) @property def experiment(self): """Return the experiment assigned to the spectrum """ return self._wrappedData.experiment @property @_includeInCopy def experimentName(self) -> str: """Common experiment type descriptor (May not be unique). """ return self._wrappedData.experiment.name @experimentName.setter @logCommand(get='self', isProperty=True) def experimentName(self, value): # force to a string # because: reading from .nef files extracts the name from the end of the experiment_type in nef reader # which is not wrapped with quotes, so defaults to an int if it can? self._wrappedData.experiment.name = str(value) @property def filePath(self) -> Optional[str]: """Definition of the NMR (binary) dataSource file; can contain redirections (e.g. $DATA) Use Spectrum.path attribute for an absolute, decoded path """ if self._dataStore is None: raise RuntimeError('dataStore not defined') return str(self._dataStore.path) @filePath.setter @logCommand(get='self', isProperty=True) @ccpNmrV3CoreSetter() def filePath(self, value: str): if self._dataStore is None: raise RuntimeError('dataStore not defined') if value is None: self._close() self._dataStore.path = None return self._openFile(path=value, dataFormat=self.dataFormat, checkParameters=True) @property def dataFormat(self) -> str: """The spectrum data-format identifier (e.g. Hdf5, NMRPipe); Automatically determined upon creating newSpectrum from a path containing spectral data. (change at your own peril!) """ if self._dataStore is None: raise RuntimeError('dataStore not defined') return self._dataStore.dataFormat @dataFormat.setter def dataFormat(self, value): self._openFile(path=self.filePath, dataFormat=value, checkParameters=True) def _openFile(self, path:str, dataFormat, checkParameters=True): """Open the spectrum as defined by path, creating a dataSource object :param path: a path to the spectrum; may contain redirections (e.g. $DATA) :param dataFormat: a dataFormat defined by one of the SpectrumDataSource types CCPNMRINTERNAL: also used in nef loader """ if path is None: raise ValueError(f'Undefined path') self._close() newDataStore = DataStore.newFromPath(path=path, dataFormat=dataFormat) newDataStore.spectrum = self self._spectrumTraits.dataStore = newDataStore self._dataStore._saveInternal() if (newDataSource := self._getDataSource(dataStore=newDataStore, checkParameters=checkParameters)) is None: getLogger().warning('Spectrum._openFile: unable to open "%s"' % path) else: # we defined a new file self._spectrumTraits.dataSource = newDataSource self._saveSpectrumMetaData()
[docs] @logCommand(get='self') def reload(self, path:str = None): """Reload the spectrum as defined by path; DataFormat and dimensionality need to match with the current Spectrum instance. All other parameters will be pulled from the (binary) spectrum data. :param path: a path to the spectrum; may contain redirections (e.g. $DATA) defaults to self.filePath. """ if path is None: path = self.filePath self._close() self._openFile(path=path, dataFormat=self.dataFormat, checkParameters=False) if self.dataSource is not None: self.dataSource.exportToSpectrum(self, includePath=False)
@property def path(self) -> Path: """return a Path instance defining the absolute, decoded path of filePath """ if self._dataStore is None: raise RuntimeError('dataStore not defined') return self._dataStore.aPath()
[docs] def hasValidPath(self) -> bool: """Return true if the spectrum's dataSource currently defines an valid dataSource object with a valid path defined """ return self.dataSource is not None and self.dataSource.hasValidPath()
[docs] def isEmptySpectrum(self) -> bool: """Return True if instance refers to an empty spectrum; i.e. as in without actual spectral data" """ if self._dataStore is None: raise RuntimeError('dataStore not defined') return self._dataStore.dataFormat == EmptySpectrumDataSource.dataFormat
#----------------------------------------------------------------------------------------- # Dimensional Attributes #----------------------------------------------------------------------------------------- def _setDimensionalAttributes(self, attributeName: str, value: (list, tuple)): """Conveniance function set the spectrumReference.attributeName to the items of value Assumes all checks have been done """ specDims = self.spectrumDimensions # local copy to avoid getting it N-times for idx, val in enumerate(value): setattr(specDims[idx], attributeName, val) def _getDimensionalAttributes(self, attributeName: str) -> list: """Conveniance function get the values for each spectrumReference.attributeName """ specDims = self.spectrumDimensions # local copy to avoid getting it N-times return [getattr(specDim, attributeName) if hasattr(specDim, attributeName) else None for specDim in specDims] @property @_includeInDimensionalCopy def pointCounts(self) -> List[int]: """Number of points per dimension""" return self._getDimensionalAttributes('pointCount') @pointCounts.setter @checkSpectrumPropertyValue(iterable=True, types=(int, float)) def pointCounts(self, value: Sequence): self._setDimensionalAttributes('pointCount', value) # @property # def totalPointCounts(self) -> List[int]: # """Total number of points per dimension; i.e. twice pointCounts in case of complex data""" # result = self.pointCounts # for axis, isC in enumerate(self.isComplex): # if isC: # result[axis] *= 2 # return result @property @_includeInDimensionalCopy def isComplex(self) -> List[bool]: """Boolean denoting Complex data per dimension""" return self._getDimensionalAttributes('isComplex') @isComplex.setter @checkSpectrumPropertyValue(iterable=True, types=(bool, int, float)) def isComplex(self, value: Sequence): self._setDimensionalAttributes('isComplex', value) @property @_includeInDimensionalCopy def isAquisition(self) -> List[bool]: """Boolean per dimension denoting if it is the aquisition dimension""" return self._getDimensionalAttributes('isAcquisition') @isAquisition.setter @checkSpectrumPropertyValue(iterable=True, types=(bool, int, float)) def isAquisition(self, value: Sequence): trues = [val for val in value if val == True] if len(trues) > 1: raise ValueError('Spectrum.isAquisition: expected zero or one dimension; got %r' % value) self._setDimensionalAttributes('isAcquisition', value) @property @_includeInDimensionalCopy def axisCodes(self) -> List[Optional[str]]: """List of an unique axisCode per dimension""" return self._getDimensionalAttributes('axisCode') @axisCodes.setter @checkSpectrumPropertyValue(iterable=True, unique=True, allowNone=True, types=(str,)) def axisCodes(self, value): self._setDimensionalAttributes('axisCode', value) @property def acquisitionAxisCode(self) -> Optional[str]: """Axis code of acquisition axis - None if not known""" trues = [idx for idx, val in enumerate(self.isAquisition) if val == True] if len(trues) == 0: return None elif len(trues) == 1: return self.axisCodes[trues[0]] else: raise RuntimeError( 'Spectrum.aquisitionAxisCode: this should not happen; more then one dimension defined as acquisition dimension') @property def dimensionTypes(self) -> List[Optional[str]]: """Dimension types ('Time' / 'Frequency' / 'Sampled') per dimension""" return self._getDimensionalAttributes('dimensionType') @dimensionTypes.setter @checkSpectrumPropertyValue(iterable=True, allowNone=True, types=(str,), enumerated=specLib.DIMENSIONTYPES) def dimensionTypes(self, value): self._setDimensionalAttributes('dimensionType', value) @property def isTimeDomains(self) -> list: """Conveniance: A list of booleans per dimension indicating if dimension is time domain """ return [(dimType == specLib.DIMENSION_TIME) for dimType in self.dimensionTypes] @property def isSampledDomains(self) -> list: """Conveniance: A list of booleans per dimension indicating if dimension is sampled """ return [(dimType == specLib.DIMENSION_SAMPLED) for dimType in self.dimensionTypes] @property @_includeInDimensionalCopy def spectralWidthsHz(self) -> List[float]: """spectral width (in Hz) per dimension""" return self._getDimensionalAttributes('spectralWidthHz') @spectralWidthsHz.setter @checkSpectrumPropertyValue(iterable=True, types=(float, int)) def spectralWidthsHz(self, value: Sequence): self._setDimensionalAttributes('spectralWidthHz', value) @property @_includeInDimensionalCopy def spectralWidths(self) -> List[float]: """spectral width (in ppm) per dimension """ return self._getDimensionalAttributes('spectralWidth') @spectralWidths.setter @checkSpectrumPropertyValue(iterable=True, types=(float, int)) def spectralWidths(self, value): self._setDimensionalAttributes('spectralWidth', value) @property def ppmPerPoints(self) -> List[float]: """Convenience; ppm-per-point for each dimension""" return self._getDimensionalAttributes('ppmPerPoint') @property def _valuePerPoints(self) -> List[Optional[float]]: """For backward compatibility; _valuePerPoint for each dimension CCPNINTERNAL: used by Peak.pointLineWidths """ return self._getDimensionalAttributes('_valuePerPoint') # @property # def valuesPerPoint(self) -> List[Optional[float]]: # """valuePerPoint for each dimension: # in ppm for Frequency dimensions # in time units (seconds) for Time (Fid) dimensions # 1.0 for sampled dimensions # """ # result = [] # _widths = self.spectralWidths # _widthsHz = self.spectralWidthsHz # _pCounts = self.pointCounts # _isComplex = self.isComplex # for axis, dimType in enumerate(self.dimensionTypes): # # if dimType == specLib.DIMENSION_FREQUENCY: # valuePerPoint = _widths[axis] / _pCounts[axis] # # elif dimType == specLib.DIMENSION_TIME: # # valuePerPoint is dwell time # valuePerPoint = 1.0 / _widthsHz[axis] if _isComplex[axis] \ # else 0.5 / _widthsHz[axis] # # elif dimType == specLib.DIMENSION_SAMPLED: # valuePerPoint = 1.0 # else: # valuePerPoint = None # # result.append(valuePerPoint) # # return result @property @_includeInDimensionalCopy def phases0(self) -> List[Optional[float]]: """Zero-order phase correction (or None), per dimension""" return self._getDimensionalAttributes('phase0') @phases0.setter @checkSpectrumPropertyValue(iterable=True, allowNone=True, types=(float, int)) def phases0(self, value: Sequence): self._setDimensionalAttributes('phase0', value) @property @_includeInDimensionalCopy def phases1(self) -> List[Optional[float]]: """First-order phase correction (or None) per dimension""" return self._getDimensionalAttributes('phase1') @phases1.setter @checkSpectrumPropertyValue(iterable=True, allowNone=True, types=(float, int)) def phases1(self, value: Sequence): self._setDimensionalAttributes('phase1', value) @property @_includeInDimensionalCopy def windowFunctions(self) -> List[Optional[str]]: """Window function name (or None); per dimension e.g. 'EM', 'GM', 'SINE', 'QSINE', .... (defined in SpectrumLib.WINDOW_FUNCTIONS) """ return self._getDimensionalAttributes('windowFunction') @windowFunctions.setter @checkSpectrumPropertyValue(iterable=True, allowNone=True, types=(str,), enumerated=specLib.WINDOW_FUNCTIONS) def windowFunctions(self, value: Sequence): self._setDimensionalAttributes('windowFunction', value) @property @_includeInDimensionalCopy def lorentzianBroadenings(self) -> List[Optional[float]]: """Lorenzian broadening (in Hz) or None; per dimension""" return self._getDimensionalAttributes('lorentzianBroadening') @lorentzianBroadenings.setter @checkSpectrumPropertyValue(iterable=True, allowNone=True, types=(float, int)) def lorentzianBroadenings(self, value: Sequence): self._setDimensionalAttributes('lorentzianBroadening', value) @property @_includeInDimensionalCopy def gaussianBroadenings(self) -> List[Optional[float]]: """Gaussian broadening or None; per dimension""" return self._getDimensionalAttributes('gaussianBroadening') @gaussianBroadenings.setter @checkSpectrumPropertyValue(iterable=True, allowNone=True, types=(float, int)) def gaussianBroadenings(self, value: Sequence): self._setDimensionalAttributes('gaussianBroadening', value) @property @_includeInDimensionalCopy def sineWindowShifts(self) -> List[Optional[float]]: """Shift of sine/sine-square window function (in degrees) or None; per dimension""" return self._getDimensionalAttributes('sineWindowShift') @sineWindowShifts.setter @checkSpectrumPropertyValue(iterable=True, allowNone=True, types=(float, int)) def sineWindowShifts(self, value: Sequence): self._setDimensionalAttributes('sineWindowShift', value) @property @_includeInDimensionalCopy def spectrometerFrequencies(self) -> List[float]: """Spectrometer frequency; per dimension""" return self._getDimensionalAttributes('spectrometerFrequency') @spectrometerFrequencies.setter @checkSpectrumPropertyValue(iterable=True, types=(float, int)) def spectrometerFrequencies(self, value): self._setDimensionalAttributes('spectrometerFrequency', value) @property @_includeInDimensionalCopy def measurementTypes(self) -> List[Optional[str]]: """Type of value being measured, per dimension. In normal cases the measurementType will be 'Shift', but other values might be 'MQSHift' (for multiple quantum axes), JCoupling (for J-resolved experiments), 'T1', 'T2', --- defined SpectrumLib.MEASUREMENT_TYPES """ return self._getDimensionalAttributes('measurementType') @measurementTypes.setter @checkSpectrumPropertyValue(iterable=True, types=(str,), enumerated=specLib.MEASUREMENT_TYPES, mapping={'shift': 'Shift'}, allowNone=True) def measurementTypes(self, value): self._setDimensionalAttributes('measurementType', value) @property @_includeInDimensionalCopy def isotopeCodes(self) -> List[str]: """isotopeCode or None; per dimension""" return self._getDimensionalAttributes('isotopeCode') @isotopeCodes.setter @logCommand(get='self', isProperty=True) @checkSpectrumPropertyValue(iterable=True, allowNone=True, types=(str,)) def isotopeCodes(self, value: Sequence): self._setDimensionalAttributes('isotopeCode', value) @property @_includeInDimensionalCopy def referenceExperimentDimensions(self) -> Tuple[Optional[str], ...]: """dimensions of reference experiment - None if no code""" result = [] for dataDim in self._wrappedData.sortedDataDims(): expDim = dataDim.expDim if expDim is None: result.append(None) else: referenceExperimentDimension = (expDim.ccpnInternalData and expDim.ccpnInternalData.get('expDimToRefExpDim')) or None result.append(referenceExperimentDimension) return tuple(result) @referenceExperimentDimensions.setter @logCommand(get='self', isProperty=True) @ccpNmrV3CoreSetter() @checkSpectrumPropertyValue(iterable=True, unique=True, allowNone=True, types=(str,)) def referenceExperimentDimensions(self, values: Sequence): apiDataSource = self._wrappedData # if not isinstance(values, (tuple, list)): # raise ValueError('referenceExperimentDimensions must be a list or tuple') # if len(values) != apiDataSource.numDim: # raise ValueError('referenceExperimentDimensions must have length %s, was %s' % (apiDataSource.numDim, values)) # if not all(isinstance(dimVal, (str, type(None))) for dimVal in values): # raise ValueError('referenceExperimentDimensions must be str, None') # _vals = [val for val in values if val is not None] # if len(_vals) != len(set(_vals)): # raise ValueError('referenceExperimentDimensions must be unique') #TODO: use self.spectrumDimensions and its attributes/methods (if needed add method) for ii, (dataDim, val) in enumerate(zip(apiDataSource.sortedDataDims(), values)): expDim = dataDim.expDim if expDim is None and val is not None: raise ValueError('Cannot set referenceExperimentDimension %s in dimension %s' % (val, ii + 1)) else: _update = {'expDimToRefExpDim': val} _ccpnInt = expDim.ccpnInternalData if _ccpnInt is None: expDim.ccpnInternalData = _update else: _expDimCID = expDim.ccpnInternalData.copy() _ccpnInt.update(_update) expDim.ccpnInternalData = _ccpnInt
[docs] def getAvailableReferenceExperimentDimensions(self, _experimentType=None) -> tuple: """Return list of available reference experiment dimensions based on spectrum isotopeCodes """ _refExperiment = None if _experimentType is not None: # search for the named reference experiment _refExperiment = self.project._getReferenceExperimentFromType(_experimentType) # get the nucleus codes from the current isotope codes nCodes = tuple(val.strip('0123456789') for val in self.isotopeCodes) # match against the current reference experiment or passed in value apiExperiment = self._wrappedData.experiment apiRefExperiment = _refExperiment or apiExperiment.refExperiment if apiRefExperiment: # get the permutations of the axisCodes and nucleusCodes axisCodePerms = permutations(apiRefExperiment.axisCodes) nucleusPerms = permutations(apiRefExperiment.nucleusCodes) # return only those that match the current nucleusCodes (from isotopeCodes) result = tuple(ac for ac, nc in zip(axisCodePerms, nucleusPerms) if nCodes == nc) return result else: return ()
@property @_includeInDimensionalCopy def foldingModes(self) -> List[Optional[str]]: """List of folding modes (values: 'circular', 'mirror', None); per dimension""" return self._getDimensionalAttributes('foldingMode') # dd = {True: 'mirror', False: 'circular', None: None} # return tuple(dd[x and x.isFolded] for x in self._mainExpDimRefs()) @foldingModes.setter @logCommand(get='self', isProperty=True) @checkSpectrumPropertyValue(iterable=True, allowNone=True, types=(str,), enumerated=specLib.FOLDING_MODES) def foldingModes(self, value): self._setDimensionalAttributes('foldingMode', value) # dd = {'circular': False, 'mirror': True, None: False} # # if len(values) != self.dimensionCount: # raise ValueError("Length of %s does not match number of dimensions." % str(values)) # if not all(isinstance(dimVal, (str, type(None))) and dimVal in dd.keys() for dimVal in values): # raise ValueError("Folding modes must be 'circular', 'mirror', None") # # self._setExpDimRefAttribute('isFolded', [dd[x] for x in values]) @property @_includeInDimensionalCopy def axisUnits(self) -> List[Optional[str]]: """List of axis units (most commonly 'ppm') or None; per dimension""" return self._getDimensionalAttributes('axisUnit') @axisUnits.setter @checkSpectrumPropertyValue(iterable=True, allowNone=True, types=(str,)) def axisUnits(self, value): self._setDimensionalAttributes('axisUnit', value) @property @_includeInDimensionalCopy def referencePoints(self) -> List[Optional[float]]: """List of points used for axis (chemical shift) referencing; per dimension. """ return self._getDimensionalAttributes('referencePoint') @referencePoints.setter @checkSpectrumPropertyValue(iterable=True, allowNone=False, types=(float, int)) def referencePoints(self, value): self._setDimensionalAttributes('referencePoint', value) @property @_includeInDimensionalCopy def referenceValues(self) -> List[Optional[float]]: """List of ppm-values used for axis (chemical shift) referencing; per dimension. """ return self._getDimensionalAttributes('referenceValue') @referenceValues.setter @checkSpectrumPropertyValue(iterable=True, allowNone=False, types=(float, int)) def referenceValues(self, value): self._setDimensionalAttributes('referenceValue', value) @property # @cached(_REFERENCESUBSTANCESCACHE, maxItems=5000, debug=False) def referenceSubstances(self): """ :return: a list of substances """ pids = self._getInternalParameter(self._REFERENCESUBSTANCES) or [] objs = _getObjectsByPids(self.project, pids) return objs @referenceSubstances.setter def referenceSubstances(self, substances): """ """ from ccpn.core.Substance import Substance pids = [su.pid for su in substances if isinstance(su, Substance)] self._setInternalParameter(self._REFERENCESUBSTANCES, pids) @property def referenceSubstance(self): """ Deprecated. See referenceSubstances """ getLogger().warning('spectrum.referenceSubstance is deprecated. Use referenceSubstances instead. ') substance = None if len(self.referenceSubstances) > 0: substance = self.referenceSubstances[-1] return substance @referenceSubstance.setter def referenceSubstance(self, substance): getLogger().warning('spectrum.referenceSubstance is deprecated. Use referenceSubstances instead. ') self.referenceSubstances = [substance] @property @_includeInDimensionalCopy def assignmentTolerances(self) -> List[float]: """Assignment tolerance in axis unit (ppm); per dimension; set to default value on basis of isotopeCode """ return self._getDimensionalAttributes('assignmentTolerance') @assignmentTolerances.setter @checkSpectrumPropertyValue(iterable=True, allowNone=True, types=(float, int)) def assignmentTolerances(self, value): self._setDimensionalAttributes('assignmentTolerance', value) # @property # def defaultAssignmentTolerances(self) -> List[Optional[float]]: # """Default assignment tolerances per dimension (in ppm), upward adjusted (if needed) for # digital resolution. # """ # return self._getDimensionalAttributes('defaultAssignmentTolerance') @property @_includeInDimensionalCopy def aliasingLimits(self) -> List[Tuple[float, float]]: """List of tuples of sorted(minAliasingLimit, maxAliasingLimit) per dimension. Setting these values will round them to the nearest multiple of the spectralWidth. """ return self._getDimensionalAttributes('aliasingLimits') @aliasingLimits.setter @logCommand(get='self', isProperty=True) @checkSpectrumPropertyValue(iterable=True, allowNone=False, types=(tuple, list)) def aliasingLimits(self, value): self._setDimensionalAttributes('aliasingLimits', value) @property def aliasingPointLimits(self) -> Tuple[Tuple[int,int], ...]: """Return a tuple of sorted(minAliasingPointLimit, maxAliasingPointLimit) per dimension. i.e. The actual point limits of the full (including the aliased regions) limits. """ return tuple(self._getDimensionalAttributes('aliasingPointLimits')) @property def aliasingIndexes(self) -> Tuple[Tuple[int,int], ...]: """Return a tuple of the number of times the spectralWidth are folded in each dimension. This is a derived property from the aliasingLimits; setting aliasingIndexes value will alter the aliasingLimits parameter accordingly. """ return tuple(self._getDimensionalAttributes('aliasingIndexes')) @aliasingIndexes.setter @checkSpectrumPropertyValue(iterable=True, allowNone=False, types=(tuple, list)) def aliasingIndexes(self, value): self._setDimensionalAttributes('aliasingIndexes', value) # GWV, plural of index is indices aliasingIndices = aliasingIndexes @property def spectrumLimits(self) -> List[Tuple[float, float]]: """list of tuples of (ppmPoint(1), ppmPoint(n)) for each dimension """ return self._getDimensionalAttributes('spectrumLimits') @property def foldingLimits(self) -> List[Tuple[float, float]]: """list of tuples of (ppmPoint(0.5), ppmPoint(n+0.5)) for each dimension """ return self._getDimensionalAttributes('foldingLimits')
[docs] def get1Dlimits(self): """Get the 1D spectrum ppm-limits and the intensity limits :return ((ppm1, ppm2), (minValue, maxValue) """ if self.dimensionCount != 1: raise RuntimeError('Spectrum.get1Dlimits() is only implemented for 1D spectra') ppmLimits = self.spectrumLimits[0] sliceData = self.getSliceData() minValue = float(min(sliceData)) maxValue = float(max(sliceData)) return (ppmLimits, (minValue, maxValue))
@property def axesReversed(self) -> Tuple[Optional[bool], ...]: """Return True if the axis is reversed per dimension """ return tuple(self._getDimensionalAttributes('isReversed')) @axesReversed.setter @ccpNmrV3CoreSetter() def axesReversed(self, value): self._setDimensionalAttributes('isReversed', value) @property def magnetisationTransfers(self) -> Tuple[MagnetisationTransferTuple, ...]: """tuple of MagnetisationTransferTuple describing magnetisation transfer between the spectrum dimensions. MagnetisationTransferTuple is a namedtuple with the fields ['dimension1', 'dimension2', 'transferType', 'isIndirect'] of types [int, int, str, bool] The dimensions are dimension numbers (one-origin] transfertype is one of (in order of increasing priority): 'onebond', 'Jcoupling', 'Jmultibond', 'relayed', 'relayed-alternate', 'through-space' isIndirect is used where there is more than one successive transfer step; it is combined with the highest-priority transferType in the transfer path. The magnetisationTransfers are deduced from the experimentType and axisCodes. Only when the experimentType is unset or does not match any known reference experiment magnetisationTransfers are kept separately in the API layer. """ result = [] apiExperiment = self._wrappedData.experiment apiRefExperiment = apiExperiment.refExperiment if apiRefExperiment: # We should use the refExperiment - if present magnetisationTransferDict = apiRefExperiment.magnetisationTransferDict() mainExpDimRefs = [dim._expDimRef for dim in self.spectrumReferences] refExpDimRefs = [x if x is None else x.refExpDimRef for x in mainExpDimRefs] for ii, rxdr in enumerate(refExpDimRefs): dim1 = ii + 1 if rxdr is not None: for jj in range(dim1, len(refExpDimRefs)): rxdr2 = refExpDimRefs[jj] if rxdr2 is not None: tt = magnetisationTransferDict.get(frozenset((rxdr, rxdr2))) if tt: result.append(MagnetisationTransferTuple(dim1, jj + 1, tt[0], tt[1])) else: # Without a refExperiment use parameters stored in the API (for reproducibility) ll = [] for apiExpTransfer in apiExperiment.expTransfers: item = [x.expDim.dim for x in apiExpTransfer.expDimRefs] item.sort() item.append(apiExpTransfer.transferType) item.append(not (apiExpTransfer.isDirect)) ll.append(item) for item in sorted(ll): result.append(MagnetisationTransferTuple(*item)) # return tuple(result) def _setMagnetisationTransfers(self, value: Tuple[MagnetisationTransferTuple, ...]): """Setter for magnetisation transfers The magnetisationTransfers are deduced from the experimentType and axisCodes. When the experimentType is set this function is a No-op. Only when the experimentType is unset or does not match any known reference experiment does this function set the magnetisation transfers, and the corresponding values are ignored if the experimentType is later set """ apiExperiment = self._wrappedData.experiment apiRefExperiment = apiExperiment.refExperiment if apiRefExperiment is None: for et in apiExperiment.expTransfers: et.delete() mainExpDimRefs = [dim._expDimRef for dim in self.spectrumReferences] # self._mainExpDimRefs() for tt in value: try: dim1, dim2, transferType, isIndirect = tt expDimRefs = (mainExpDimRefs[dim1 - 1], mainExpDimRefs[dim2 - 1]) except: raise ValueError( "Attempt to set incorrect magnetisationTransfer value %s in spectrum %s" % (tt, self.pid) ) apiExperiment.newExpTransfer(expDimRefs=expDimRefs, transferType=transferType, isDirect=(not isIndirect)) else: getLogger().warning( """An attempt to set Spectrum.magnetisationTransfers directly was ignored because the spectrum experimentType was defined. Use axisCodes to set magnetisation transfers instead.""") @property def intensities(self) -> SliceData: """ spectral intensities as a SliceData (i.e. NumPy array) for 1D spectra """ if self.dimensionCount != 1: getLogger().warning('Currently this method only works for 1D spectra') return SliceData((self.pointCounts[0],)) if self._intensities is None: # Assignment is Redundant as getSliceData does that; # Nevertheless for clarity self._intensities = self.getSliceData() return self._intensities @intensities.setter def intensities(self, value: numpy.array): self._intensities = value # NOTE:ED - temporary hack for showing straight the result of intensities change for spectrumView in self.spectrumViews: spectrumView.refreshData() @property def positions(self) -> numpy.array: """ spectral region in ppm as NumPy array for 1D spectra """ if self.dimensionCount != 1: getLogger().warning('Currently this method only works for 1D spectra') return numpy.array([]) if self._positions is None: self._positions = self.getPpmArray(dimension=1) return self._positions @positions.setter def positions(self, value): # self._scaleChanged = True self._positions = value # NOTE:ED - temporary hack for showing straight the result of intensities change for spectrumView in self.spectrumViews: spectrumView.refreshData() @property @_includeInCopy def displayFoldedContours(self): """Return whether the folded spectrum contours are to be displayed """ result = self._getInternalParameter(self._DISPLAYFOLDEDCONTOURS) if result is None: # default to True return True return result @displayFoldedContours.setter def displayFoldedContours(self, value): """Set whether the folded spectrum contours are to be displayed """ if not isinstance(value, bool): raise ValueError("Spectrum.displayFoldedContours: must be True/False.") self._setInternalParameter(self._DISPLAYFOLDEDCONTOURS, value) @property def _seriesItems(self): """Return a tuple of the series items for the spectrumGroups """ items = self._getInternalParameter(self._SERIESITEMS) if items is not None: series = () for sg in self.spectrumGroups: if sg.pid in items: series += (items[sg.pid],) else: series += (None,) return series @_seriesItems.setter @ccpNmrV3CoreSetter() def _seriesItems(self, items): """Set the series items for all spectrumGroups that spectrum is attached to. Must be of the form ( <item1>, <item2>, ... <itemN> ) where <itemsN> are of the same type (or None) """ if not items: raise ValueError('items is not defined') if not isinstance(items, (tuple, list, type(None))): raise TypeError('items is not of type tuple/None') if len(items) != len(self.spectrumGroups): raise ValueError('Number of items does not match number of spectrumGroups') if isinstance(items, tuple): diffItems = set(type(item) for item in items) if len(diffItems) > 2 or (len(diffItems) == 2 and type(None) not in diffItems): raise ValueError('Items must be of the same type (or None)') seriesItems = self._getInternalParameter(self._SERIESITEMS) for sg, item in zip(self.spectrumGroups, items): if seriesItems: seriesItems[sg.pid] = item else: seriesItems = {sg.pid: item} self._setInternalParameter(self._SERIESITEMS, seriesItems) else: self._setInternalParameter(self._SERIESITEMS, None) def _getSeriesItem(self, spectrumGroup): """Return the series item for the current spectrum for the selected spectrumGroup """ from ccpn.core.SpectrumGroup import SpectrumGroup spectrumGroup = self.project.getByPid(spectrumGroup) if isinstance(spectrumGroup, str) else spectrumGroup if not isinstance(spectrumGroup, SpectrumGroup): raise TypeError('%s is not a spectrumGroup' % str(spectrumGroup)) if self not in spectrumGroup.spectra: raise ValueError('Spectrum %s does not belong to spectrumGroup %s' % (str(self), str(spectrumGroup))) seriesItems = self._getInternalParameter(self._SERIESITEMS) if seriesItems and spectrumGroup.pid in seriesItems: return seriesItems[spectrumGroup.pid] def _setSeriesItem(self, spectrumGroup, item): """Set the series item for the current spectrum for the selected spectrumGroup MUST be called from spectrumGroup - error checking for item types is handled there """ from ccpn.core.SpectrumGroup import SpectrumGroup # check that the spectrumGroup and spectrum are valid spectrumGroup = self.project.getByPid(spectrumGroup) if isinstance(spectrumGroup, str) else spectrumGroup if not isinstance(spectrumGroup, SpectrumGroup): raise TypeError('%s is not a spectrumGroup', spectrumGroup) if self not in spectrumGroup.spectra: raise ValueError('Spectrum %s does not belong to spectrumGroup %s' % (str(self), str(spectrumGroup))) seriesItems = self._getInternalParameter(self._SERIESITEMS) if seriesItems: seriesItems[spectrumGroup.pid] = item else: seriesItems = {spectrumGroup.pid: item} self._setInternalParameter(self._SERIESITEMS, seriesItems) def _renameSeriesItems(self, spectrumGroup, oldPid): """rename the keys in the seriesItems to reflect the updated spectrumGroup name """ seriesItems = self._getInternalParameter(self._SERIESITEMS) if oldPid in (seriesItems if seriesItems else ()): # insert new items with the new pid oldItems = seriesItems[oldPid] del seriesItems[oldPid] seriesItems[spectrumGroup.pid] = oldItems self._setInternalParameter(self._SERIESITEMS, seriesItems) def _getSeriesItemsById(self, pid): """Return the series item for the current spectrum by 'pid' CCPNINTERNAL: used in creating new spectrumGroups - not for external use """ seriesItems = self._getInternalParameter(self._SERIESITEMS) if seriesItems and pid in seriesItems: return seriesItems[pid] def _setSeriesItemsById(self, pid, item): """Set the series item for the current spectrum by 'pid' CCPNINTERNAL: used in creating new spectrumGroups - not for external use """ seriesItems = self._getInternalParameter(self._SERIESITEMS) if seriesItems: seriesItems[pid] = item else: seriesItems = {pid: item} self._setInternalParameter(self._SERIESITEMS, seriesItems) def _removeSeriesItemsById(self, pid): """Remove the keys in the seriesItems allocated to 'pid' CCPNINTERNAL: used in creating new spectrumGroups - not for external use """ # useful for storing an item seriesItems = self._getInternalParameter(self._SERIESITEMS) if pid in seriesItems: del seriesItems[pid] self._setInternalParameter(self._SERIESITEMS, seriesItems) @property def temperature(self): """The temperature of the spectrometer when the spectrum was recorded """ if self._wrappedData.experiment: return self._wrappedData.experiment.temperature @temperature.setter def temperature(self, value): """The temperature of the spectrometer when the spectrum was recorded """ if self._wrappedData.experiment: self._wrappedData.experiment.temperature = value @property def _preferredAxisOrdering(self): """Return the preferred ordering for the axes (i.e zero-based); e.g. used when opening a new spectrumDisplay """ result = self._getInternalParameter(self._PREFERREDAXISORDERING) if result is None: result = self.dimensionIndices return result @_preferredAxisOrdering.setter @checkSpectrumPropertyValue(iterable=True, unique=True, types=(int,)) def _preferredAxisOrdering(self, value): self._setInternalParameter(self._PREFERREDAXISORDERING, value)
[docs] @checkSpectrumPropertyValue(iterable=True, unique=True, types=(int,)) def setPreferredDimensionOrdering(self, dimensionOrder): """Set the preferred dimension ordering ;param dimensionOrder: tuple,list of dimensions (1-based; len dimensionCount) """ self._preferredAxisOrdering = [d-1 for d in dimensionOrder]
def _setDefaultAxisOrdering(self): """Set the default axis ordering based on some hierarchy rules (defined in the core/lib/SpectrumLib.oy file """ _setDefaultAxisOrdering(self) #----------------------------------------------------------------------------------------- # Library functions #-----------------------------------------------------------------------------------------
[docs] def ppm2point(self, value, axisCode=None, dimension=None): """Convert ppm value to point value for axis corresponding to either axisCode or dimension (1-based) """ if dimension is None and axisCode is None: raise ValueError('Spectrum.ppm2point: either axisCode or dimension needs to be defined') if dimension is not None and axisCode is not None: raise ValueError('Spectrum.ppm2point: axisCode and dimension cannot be both defined') if axisCode is not None: dimension = self.getByAxisCodes('dimensions', [axisCode], exactMatch=False)[0] if dimension is None or dimension < 1 or dimension > self.dimensionCount: raise RuntimeError('Invalid dimension (%s)' % (dimension,)) return self.spectrumDimensions[dimension - 1].valueToPoint(value)
[docs] def point2ppm(self, value, axisCode=None, dimension=None): """Convert point value to ppm for axis corresponding to to either axisCode or dimension (1-based) """ if dimension is None and axisCode is None: raise ValueError('Spectrum.point2ppm: either axisCode or dimension needs to be defined') if dimension is not None and axisCode is not None: raise ValueError('Spectrum.point2ppm: axisCode and dimension cannot be both defined') if axisCode is not None: dimension = self.getByAxisCodes('dimensions', [axisCode], exactMatch=False)[0] if dimension is None or dimension < 1 or dimension > self.dimensionCount: raise RuntimeError('Invalid dimension (%s)' % (dimension,)) return self.spectrumDimensions[dimension - 1].pointToValue(value)
[docs] def getPpmArray(self, axisCode=None, dimension=None) -> numpy.array: """Return a numpy array with ppm values of the grid points along axisCode or dimension """ if dimension is None and axisCode is None: raise ValueError('Spectrum.getPpmArray: either axisCode or dimension needs to be defined') if dimension is not None and axisCode is not None: raise ValueError('Spectrum.getPpmArray: axisCode and dimension cannot be both defined') if axisCode is not None: dimension = self.getByAxisCodes('dimensions', [axisCode], exactMatch=False)[0] if dimension is None or dimension < 1 or dimension > self.dimensionCount: raise RuntimeError('Invalid dimension (%s)' % (dimension,)) spectrumLimits = self.spectrumLimits[dimension - 1] result = numpy.linspace(spectrumLimits[0], spectrumLimits[1], self.pointCounts[dimension - 1]) return result
# def _verifyAxisCodeDimension(self, axisCode, dimension): # """Verify the axisCode and dimension # Return the aliasing information for the given axis # """ # if dimension is None and axisCode is None: # raise ValueError('Spectrum._verifyAxisCodeDimension: either axisCode or dimension needs to be defined') # if dimension is not None and axisCode is not None: # raise ValueError('Spectrum._verifyAxisCodeDimension: axisCode and dimension cannot be both defined') # if axisCode is not None: # dimension = self.getByAxisCodes('dimensions', [axisCode], exactMatch=False)[0] # if dimension is None or dimension < 1 or dimension > self.dimensionCount: # raise RuntimeError('Invalid dimension (%s)' % (dimension,)) # # aliasLims = self.aliasingLimits[dimension - 1] # axisRevd = self.axesReversed[dimension - 1] # pCount = self.pointCounts[dimension - 1] # vpp = self.valuesPerPoint[dimension - 1] * 0.5 # offset for aliasingLimits # if axisRevd: # aliasLims = list(reversed(aliasLims)) # vpp = -vpp # ppmL, ppmR = aliasLims[0] + vpp, aliasLims[1] - vpp # pL, pR = round(self.ppm2point(ppmL, dimension=dimension)), round(self.ppm2point(ppmR, dimension=dimension)) # # # clip to the maximum allowed aliasing limits # pL = min((MAXALIASINGRANGE + 1) * pCount, max(-MAXALIASINGRANGE * pCount, pL)) # pR = min((MAXALIASINGRANGE + 1) * pCount, max(-MAXALIASINGRANGE * pCount, pR)) # return ppmL, ppmR, pL, pR # def getPpmAliasingLimitsArray(self, axisCode=None, dimension=None) -> numpy.array: # """Return a numpy array of ppm values of the grid points along axisCode or dimension # for the points contained by the aliasing limits, end points are inclusive # """ # ppmL, ppmR, pL, pR = self._verifyAxisCodeDimension(axisCode, dimension) # return numpy.linspace(ppmL, ppmR, pR - pL + 1) # # def getPpmAliasingLimits(self, axisCode=None, dimension=None): # """Return a tuple of ppm values of the (first, last) grid points along axisCode or dimension # for the points contained by the aliasing limits, end points are inclusive # """ # ppmL, ppmR, _tmp1, _tmp2 = self._verifyAxisCodeDimension(axisCode, dimension) # return (ppmL, ppmR) # def getPointAliasingLimitsArray(self, axisCode=None, dimension=None) -> numpy.array: # """Return a numpy array with point values of the grid points along axisCode or dimension # """ # _tmp1, _tmp2, pL, pR = self._verifyAxisCodeDimension(axisCode, dimension) # return numpy.linspace(pL, pR, pR - pL + 1) # def getPointAliasingLimits(self, axisCode=None, dimension=None): # """Return a tuple of point values of the (first, last) grid points along axisCode or dimension # """ # _tmp1, _tmp2, pL, pR = self._verifyAxisCodeDimension(axisCode, dimension) # return (pL, pR) # def automaticIntegration(self, spectralData): # return self._apiDataSource.automaticIntegration(spectralData) def _mapAxisCodes(self, axisCodes: Sequence[str]) -> list: """Map axisCodes on self.axisCodes :return mapped axisCodes as list CCPNMRINTERNAL: used in SpectrumDisplay._getDimensionsMapping() """ # find the map of newAxisCodeOrder to self.axisCodes; eg. 'H' to 'Hn' axisCodeMap = getAxisCodeMatch(axisCodes, self.axisCodes) if len(axisCodeMap) == 0: raise ValueError('axisCodes %r contains an invalid element' % axisCodes) return [axisCodeMap[a] for a in axisCodes]
[docs] def orderByAxisCodes(self, iterable, axisCodes: Sequence[str] = None, exactMatch: bool = False) -> list: """Return a list with values of an iterable in order defined by axisCodes (default order if None). Perform a mapping if exactMatch=False (eg. 'H' to 'Hn') :param iterable: an iterable (tuple, list) :param axisCodes: a tuple or list of axisCodes :param exactMatch: a boolean optional testing for an exact match with the instance axisCodes :return: the values defined by iterable in axisCode order Related: Use getByDimensions() for dimensions (1..dimensionCount) based access of dimensional parameters of the Spectrum class. Use getByAxisCodes() for axisCode based access of dimensional parameters of the Spectrum class. """ from ccpn.core.lib.SpectrumLib import _orderByDimensions if axisCodes is None: axisCodes = self.axisCodes else: if not isIterable(axisCodes): raise ValueError('%s.orderByAxisCodes: axisCodes is not iterable "%s"; expected list or tuple' % (self.className, axisCodes)) # do some optional axis code matching if not exactMatch: if (_axisCodes := self._mapAxisCodes(axisCodes)) is None: raise ValueError('%s.orderByAxisCodes: Failed mapping axisCodes "%s"' % (self.className, axisCodes)) axisCodes = _axisCodes # we now should have valid axisCodes for ac in axisCodes: if not ac in self.axisCodes: raise ValueError('%s.orderByAxisCodes: invalid axisCode "%s" in %r' % (self.className, ac, axisCodes)) # create an (axisCode, dimension) mapping mapping = dict([(ac, dim) for dimIndx, ac, dim in self.dimensionTriples]) # get the dimensions in axisCode order dimensions = [mapping[ac] for ac in axisCodes] # get the values of iterable in axisCode==dimensions order values = _orderByDimensions(iterable, dimensions=dimensions, dimensionCount=self.dimensionCount) return values
[docs] def getByAxisCodes(self, parameterName: str, axisCodes: Sequence[str] = None, exactMatch: bool = False) -> list: """Return a list of values defined by parameterName in order defined by axisCodes (default order if None). Perform a mapping if exactMatch=False (eg. 'H' to 'Hn') :param parameterName: a str denoting a Spectrum dimensional attribute :param axisCodes: a tuple or list of axisCodes :param exactMatch: a boolean optional testing for an exact match with the instance axisCodes :return: the values defined by parameterName in axisCode order Related: Use getByDimensions() for dimensions (1..dimensionCount) based access of dimensional parameters of the Spectrum class. """ from ccpn.core.lib.SpectrumLib import _getParameterValues if axisCodes is None: dimensions = self.dimensions else: dimensions = self.orderByAxisCodes(self.dimensions, axisCodes=axisCodes, exactMatch=exactMatch) try: newValues = _getParameterValues(self, parameterName, dimensions=dimensions, dimensionCount=self.dimensionCount) except ValueError as es: raise ValueError('%s.getByAxisCodes: %s' % (self.__class__.__name__, str(es))) return newValues
[docs] def setByAxisCodes(self, parameterName:str, values:Sequence, axisCodes:Sequence[str] = None, exactMatch:bool = False) -> list: """Set attributeName to values in order defined by axisCodes (default order if None) Perform a mapping if exactMatch=False (eg. 'H' to 'Hn') :param parameterName: a str denoting a Spectrum dimensional attribute :param values: an iterable with values :param axisCodes: a tuple or list of axisCodes :param exactMatch: a boolean optional testing for an exact match with the instance axisCodes :return: a list of newly set values of parameterName (in default order) Related: Use setByDimensions() for dimensions (1..dimensionCount) based setting of dimensional parameters of the Spectrum class. """ from ccpn.core.lib.SpectrumLib import _setParameterValues if axisCodes is None: dimensions = self.dimensions else: dimensions = self.orderByAxisCodes(self.dimensions, axisCodes=axisCodes, exactMatch=exactMatch) try: newValues = _setParameterValues(self, parameterName, values, dimensions=dimensions, dimensionCount=self.dimensionCount) except ValueError as es: raise ValueError('%s.setByAxisCodes: %s' % (self.__class__.__name__, str(es))) return newValues
[docs] def orderByDimensions(self, iterable, dimensions=None) -> list: """Return a list of values of iterable in order defined by dimensions (default order if None). :param iterable: an iterable (tuple, list) :param dimensions: a tuple or list of dimensions (1..dimensionCount) :return: a list with values defined by iterable in dimensions order """ from ccpn.core.lib.SpectrumLib import _orderByDimensions if dimensions is None: dimensions = self.dimensions return _orderByDimensions(iterable, dimensions=dimensions, dimensionCount=self.dimensionCount)
[docs] def getByDimensions(self, parameterName:str, dimensions:Sequence[int] = None) -> list: """Return a list of values of Spectrum dimensional attribute parameterName in order defined by dimensions (default order if None). :param parameterName: a str denoting a Spectrum dimensional attribute :param dimensions: a tuple or list of dimensions (1..dimensionCount) :return: a list of values defined by parameterName in dimensions order Related: Use getByAxisCodes() for axisCode based access of dimensional parameters of the Spectrum class. """ from ccpn.core.lib.SpectrumLib import _getParameterValues if dimensions is None: dimensions = self.dimensions try: newValues = _getParameterValues(self, parameterName, dimensions=dimensions, dimensionCount=self.dimensionCount) except ValueError as es: raise ValueError('%s.getByDimensions: %s' % (self.__class__.__name__, str(es))) return newValues
[docs] def setByDimensions(self, parameterName: str, values:Sequence, dimensions:Sequence[int] = None) -> list: """Set Spectrum dimensional attribute parameterName to values in the order as defined by dimensions (1..dimensionCount)(default order if None) :param parameterName: a str denoting a Spectrum dimensional attribute :param values: a list of values to set for each dimension :param dimensions: a tuple or list of dimensions (1..dimensionCount) :return: a list of newly set values of parameterName (in default order) Related: Use setByAxisCodes() for axisCode based setting of dimensional parameters of the Spectrum class. """ from ccpn.core.lib.SpectrumLib import _setParameterValues if dimensions is None: dimensions = self.dimensions try: newValues = _setParameterValues(self, parameterName, values, dimensions=dimensions, dimensionCount=self.dimensionCount) except ValueError as es: raise ValueError('%s.setByDimensions: %s' % (self.__class__.__name__, str(es))) return newValues
def _setDefaultContourValues(self, base=None, multiplier=1.41, count=10): """Set default contour values """ if base is None: base = self.noiseLevel * multiplier if self.noiseLevel else 1e6 base = max(base, 1.0) # Contour bases have to be > 0.0 self.positiveContourBase = base self.positiveContourFactor = multiplier self.positiveContourCount = count self.negativeContourBase = -1.0 * base self.negativeContourFactor = multiplier self.negativeContourCount = count def _setDefaultContourColours(self): """Set default contour colours """ (self.positiveContourColour, self.negativeContourColour) = getDefaultSpectrumColours(self) self.sliceColour = self.positiveContourColour
[docs] def getPeakAliasingRanges(self): """Return the min/max aliasing Values for the peakLists in the spectrum, if there are no peakLists with peaks, return None """ # get the aliasingRanges for non-empty peakLists aliasRanges = [peakList.getPeakAliasingRanges() for peakList in self.peakLists if peakList.peaks] if aliasRanges: # if there is only one then return it (for clarity) if len(aliasRanges) == 1: return aliasRanges[0] # get the value from all the peakLists newRanges = list(aliasRanges[0]) for ii, alias in enumerate(aliasRanges[1:]): if alias is not None: newRanges = tuple((min(minL, minR), max(maxL, maxR)) for (minL, maxL), (minR, maxR) in zip(alias, newRanges)) return newRanges
def _copyDimensionalParameters(self, axisCodes, target): """Copy dimensional parameters for axisCodes from self to target """ for attr in _includeInCopyList().getMultiDimensional(): try: values = self.getByAxisCodes(attr, axisCodes, exactMatch=True) target.setByAxisCodes(attr, values, axisCodes, exactMatch=True) except Exception as es: getLogger().error('Copying "%s" from %s to %s for axisCodes %s: %s' % (attr, self, target, axisCodes, es) ) def _copyNonDimensionalParameters(self, target): """Copy non-dimensional parameters from self to target """ for parameterName in _includeInCopyList().getNoneDimensional(): try: value = getattr(self, parameterName) setattr(target, parameterName, value) except Exception as es: getLogger().warning('Copying parameter %r from %s to %s: %s' % (parameterName, self, target, es))
[docs] def copyParameters(self, axisCodes, target): """Copy non-dimensional and dimensional parameters for axisCodes from self to target """ self._copyNonDimensionalParameters(target) self._copyDimensionalParameters(axisCodes, target)
[docs] def estimateNoise(self): """Estimate and return the noise level, or None if it cannot be """ if self.dataSource is not None: noise = self.dataSource.estimateNoise() else: noise = None return noise
#----------------------------------------------------------------------------------------- # data access functions #-----------------------------------------------------------------------------------------
[docs] def isBuffered(self): """Return True if dataSource of spectrum is buffered """ if self.dataSource is None: return False return self.dataSource.isBuffered
[docs] def setBuffering(self, isBuffered:bool, path:str = None): """Set Hdf5-buffering. Buffering status is retained (upon exit/save) if path is None; i.e. autobuffering, until buffering is disabled by isBuffered=False :param isBuffered: set the buffering status :param path: store hdf5buffer file at path; implies non-temporary buffer """ if self.dataSource is None: getLogger().warning('No proper (filePath, dataFormat) set for %s' % self) return if isBuffered: bufferIsTemporary = (path is None) if path is not None: _bufferStore = DataStore.newFromPath(path=path, autoVersioning=True, dataFormat=Hdf5SpectrumDataSource.dataFormat, withSuffix=Hdf5SpectrumDataSource.suffixes[0] ) path = _bufferStore.aPath() self._dataStore.useBuffer = False # Explicit path, no autobuffering else: self._dataStore.useBuffer = True self.dataSource.setBuffering(isBuffered=True, bufferIsTemporary=bufferIsTemporary, bufferPath=path) else: # Turn off buffering if self.dataSource.isBuffered: self.dataSource.closeHdf5Buffer() self.dataSource.setBuffering(isBuffered=False) self._dataStore.useBuffer = False self._dataStore._saveInternal()
[docs] @logCommand(get='self') def getIntensity(self, ppmPositions) -> float: """Returns the interpolated height at the ppm position """ # The height below is not derived from any fitting # but is a weighted average of the values at the neighbouring grid points getLogger().warning('This routine has been replaced with getHeight') return self.getHeight(ppmPositions)
[docs] @logCommand(get='self') def getHeight(self, ppmPositions) -> float: """Returns the interpolated height at the ppm position """ if len(ppmPositions) != self.dimensionCount: raise ValueError('Length of %s does not match number of dimensions' % str(ppmPositions)) if not all(isinstance(dimVal, (int, float)) for dimVal in ppmPositions): raise ValueError('ppmPositions values must be floats') pointPositions = [self.ppm2point(p, dimension=idx + 1) for idx, p in enumerate(ppmPositions)] return self.getPointValue(pointPositions)
[docs] @logCommand(get='self') def getPointValue(self, pointPositions) -> float: """Return the value interpolated at the position given in points (1-based, float values). """ if len(pointPositions) != self.dimensionCount: raise ValueError('Length of %s does not match number of dimensions.' % str(pointPositions)) if not all(isinstance(dimVal, (int, float)) for dimVal in pointPositions): raise ValueError('position values must be floats.') if self.dataSource is None: getLogger().warning('No proper (filePath, dataFormat) set for %s; Returning zero' % self) return 0.0 # need to check folding/circular/±sign # pointPositions = [] # aliasing = [] # for idx, (p, np) in enumerate(zip(ppmPos, spectrum.pointCounts)): # pp = spectrum.ppm2point(p, dimension=idx + 1) # pointPositions.append(((pp - 1) % np) + 1) # aliasing.append((pp - 1) // np) aliasingFlags = [1] * self.dimensionCount value = self.dataSource.getPointValue(pointPositions, aliasingFlags=aliasingFlags) return value * self.scale
[docs] @logCommand(get='self') def getSliceData(self, position=None, sliceDim: int = 1) -> SliceData: """Get a slice defined by sliceDim and a position vector as a SliceData object (i.e. a numpy array). :param position: An optional list/tuple of point positions (1-based); defaults to [1,1,1,1] :param sliceDim: Dimension of the slice axis (1-based); defaults to 1 :return: SliceData 1D data array NB: use getSlice() method for axisCode based access """ if self.dataSource is None: getLogger().warning('No proper (filePath, dataFormat) set for %s; Returning zeros only' % self) data = SliceData(shape=(self.pointCounts[sliceDim - 1],), dimensions=(sliceDim,), position=[1]*self.dimensionCount ) else: try: position = self.dataSource.checkForValidSlice(position, sliceDim) data = self.dataSource.getSliceData(position=position, sliceDim=sliceDim) data = data.copy(order='K') * self.scale except (RuntimeError, ValueError) as es: getLogger().error('Spectrum.getSliceData: %s' % es) raise es # For 1D, save as intensities attribute; self._intensities = data return data
[docs] @logCommand(get='self') def getSlice(self, axisCode, position) -> SliceData: """Get a slice defined by axisCode and a position vector a SliceData object (i.e. a numpy array) :param axisCode: valid axisCode of the slice axis :param position: An optional list/tuple of point positions (1-based); defaults to [1,1,1,1] :return: SliceData 1D data array NB: use getSliceData() method for dimension based access """ dimensions = self.getByAxisCodes('dimensions', [axisCode], exactMatch=True) return self.getSliceData(position=position, sliceDim=dimensions[0])
[docs] def setSliceData(self, data, position: Sequence = None, sliceDim: int = 1): """Set data as slice defined by sliceDim and position (all 1-based) """ if self.dataSource is None: getLogger().warning('No proper (filePath, dataFormat) set for %s; cannot set plane data' % self) return try: position = self.dataSource.checkForValidSlice(position, sliceDim=sliceDim) except (RuntimeError, ValueError) as es: getLogger().error('invalid arguments: %s' % es) raise es self.dataSource.setSliceData(data=data, position=position, sliceDim=sliceDim)
[docs] @logCommand(get='self') def extractSliceToFile(self, axisCode, position, path=None, dataFormat='Hdf5'): """Extract 1D slice from self as new Spectrum instance; saved to path (autogenerated if None) if 1D it effectively yields a copy of self :param axisCode: axiscode of slice to extract :param position: position vector (1-based) :param path: optional path; if None, constructed from current filePath :param dataFormat: string identifier for dataFormat of resulting file; dataFormat need to have writing abilty (currently only for Hdf5) :return: Spectrum instance """ if self.dataSource is None: text = 'No proper (filePath, dataFormat) set for %s; unable to extract slice' % self getLogger().error(text) raise RuntimeError(text) if axisCode is None or axisCode not in self.axisCodes: raise ValueError('Invalid axisCode %r, should be one of %s' % (axisCode, self.axisCodes)) try: dimensions = self.getByAxisCodes('dimensions', [axisCode]) self.dataSource.checkForValidSlice(position=position, sliceDim=dimensions[0]) newSpectrum = self._extractToFile(axisCodes=[axisCode], position=position, path=path, dataFormat=dataFormat, tag='slice') except (ValueError, RuntimeError) as es: text = 'Spectrum.extractSliceToFile: %s' % es raise ValueError(text) return newSpectrum
[docs] @logCommand(get='self') def getPlaneData(self, position=None, xDim: int = 1, yDim: int = 2) -> PlaneData: """Get a plane defined by by xDim and yDim ('1'-based), and a position vector ('1'-based) as a PlaneData object (i.e. a 2D numpy.ndarray). Dimensionality must be >= 2 :param position: A list/tuple of point-positions (1-based) :param xDim: Dimension of the first axis (1-based) :param yDim: Dimension of the second axis (1-based) :return: a PlaneData object (i.e. a 2D numpy.ndarray in order (yDim, xDim)) NB: use getPlane() method for axisCode based access """ if self.dimensionCount < 2: raise RuntimeError("Spectrum.getPlaneData: dimensionality < 2") if self.dataSource is None: getLogger().warning('No proper (filePath, dataFormat) set for %s; Returning zeros only' % self) return PlaneData(shape=(self.pointCounts[yDim - 1], self.pointCounts[xDim - 1]), dimensions=(xDim, yDim), position=[1]*self.dimensionCount ) try: position = self.dataSource.checkForValidPlane(position, xDim=xDim, yDim=yDim) except (RuntimeError, ValueError) as es: getLogger().error('invalid arguments: %s' % es) raise es data = self.dataSource.getPlaneData(position=position, xDim=xDim, yDim=yDim) # Make a copy in order to preserve the original data and apply scaling data = data.copy(order='K') * self.scale #TODO: settle on the axisReversed issue # if self.axesReversed[xDim-1]: # data = numpy.flip(data, axis=0) # data are [y,x] ordered # if self.axesReversed[yDim-1]: # data = numpy.flip(data, axis=1) # data are [y,x] ordered return data
[docs] @logCommand(get='self') def getPlane(self, axisCodes, position=None) -> PlaneData: """Get a plane defined by axisCodes and position as a a PlaneData object (i.e. a 2D numpy.ndarray in order (yDim, xDim)). Dimensionality must be >= 2 :param axisCodes: tuple/list of two axisCodes; expand if exactMatch=False :param position: A list/tuple of point-positions (1-based) :param axisCodes: A list/tuple of axisCodes that define the plane dimensions :return: a PlaneData object (i.e. a 2D numpy.ndarray in order (yDim, xDim)) NB: use getPlaneData method for dimension based access """ if len(axisCodes) != 2: raise ValueError('Invalid axisCodes %s, len should be 2' % axisCodes) xDim, yDim = self.getByAxisCodes('dimensions', axisCodes, exactMatch=True) return self.getPlaneData(position=position, xDim=xDim, yDim=yDim)
[docs] def setPlaneData(self, data, position: Sequence = None, xDim: int = 1, yDim: int = 2): """Set the plane data as defined by xDim, yDim and position (all 1-based). :param data: A 2-dimensional float32 numpy array in order (yDim, xDim) :param position: position: A list/tuple of point-positions (1-based) :param xDim: Dimension of the first axis (1-based) :param yDim: Dimension of the second axis (1-based) """ if self.dimensionCount < 2: raise RuntimeError("Spectrum.gstPlaneData: dimensionality < 2") if self.dataSource is None: getLogger().warning('No proper (filePath, dataFormat) set for %s; cannot set plane data' % self) return try: position = self.dataSource.checkForValidPlane(position, xDim=xDim, yDim=yDim) except (RuntimeError, ValueError) as es: getLogger().error('invalid arguments: %s' % es) raise es self.dataSource.setPlaneData(data=data, position=position, xDim=xDim, yDim=yDim)
[docs] @logCommand(get='self') def extractPlaneToFile(self, axisCodes: (tuple, list), position=None, path=None, dataFormat='Hdf5'): """Save a plane, defined by axisCodes and position, to path using dataFormat Dimensionality must be >= 2 :param axisCodes: tuple/list of two axisCodes :param position: a list/tuple of point-positions (1-based) :param path: path of the resulting file; auto-generated if None :param dataFormat: a data format valid for writing (default='Hdf5') :returns plane as Spectrum instance """ if self.dataSource is None: text = 'No proper (filePath, dataFormat) set for %s; unable to extract plane' % self getLogger().error(text) raise RuntimeError(text) if axisCodes is None or len(axisCodes) != 2: raise ValueError('Invalid parameter axisCodes "%s", should be two of %s' % (axisCodes, self.axisCodes)) try: dimensions = self.getByAxisCodes('dimensions', axisCodes) self.dataSource.checkForValidPlane(position=position, xDim=dimensions[0], yDim=dimensions[1]) newSpectrum = self._extractToFile(axisCodes=axisCodes, position=position, path=path, dataFormat=dataFormat, tag='plane') except (ValueError, RuntimeError) as es: text = 'Spectrum.extractPlaneToFile: %s' % es raise ValueError(text) return newSpectrum
[docs] @logCommand(get='self') def getProjection(self, axisCodes: (tuple, list), method: str = 'max', threshold=None) -> PlaneData: """Get projected plane defined by two axisCodes, using method and an optional threshold :param axisCodes: tuple/list of two axisCodes; expand if exactMatch=False :param method: 'max', 'max above threshold', 'min', 'min below threshold', 'sum', 'sum above threshold', 'sum below threshold' :param threshold: threshold value for relevant method :return: projected spectrum data as a PlaneData object (i.e. a 2D numpy.ndarray in order (yDim, xDim)) """ projectedData = _getProjection(self, axisCodes=axisCodes, method=method, threshold=threshold) return projectedData
[docs] @logCommand(get='self') def extractProjectionToFile(self, axisCodes: (tuple, list), method: str = 'max', threshold=None, path=None, dataFormat='Hdf5'): """Save a projected plane, defined by axisCodes and position, using method and an optional threshold, to path using dataFormat Dimensionality must be >= 2 :param axisCodes: tuple/list of two axisCodes :param method: 'max', 'max above threshold', 'min', 'min below threshold', 'sum', 'sum above threshold', 'sum below threshold' :param threshold: threshold value for relevant method :param path: path of the resulting file; auto-generated if None :param dataFormat: a data format valid for writing :returns projected plane as Spectrum instance """ if self.dataSource is None: text = 'No proper (filePath, dataFormat) set for %s; unable to extract plane' % self getLogger().error(text) raise RuntimeError(text) if axisCodes is None or len(axisCodes) != 2: raise ValueError('Invalid parameter axisCodes "%s", should be two of %s' % (axisCodes, self.axisCodes)) try: xDim, yDim = self.getByAxisCodes('dimensions', axisCodes) position = [1] * self.dimensionCount self.dataSource.checkForValidPlane(position=position, xDim=xDim, yDim=yDim) newSpectrum = self._extractToFile(axisCodes=axisCodes, position=position, path=path, dataFormat=dataFormat, tag='projection') projectionData = self.getProjection(axisCodes=axisCodes, method=method, threshold=threshold) # For writing, we need to remap the axisCodes onto the newSpectrum xDim2, yDim2 = newSpectrum.getByAxisCodes('dimensions', axisCodes) newSpectrum.dataSource.setPlaneData(data=projectionData, position=position, xDim=xDim2, yDim=yDim2) except (ValueError, RuntimeError) as es: text = 'Spectrum.extractProjectionToFile: %s' % es raise ValueError(text) return newSpectrum
[docs] @logCommand(get='self') def cloneAsHdf5(self, path=None, suffix='_cloned'): """Clone self and all peakLists as an Hdf5 type file :return: a Spectrum instance of the cloned spectrum """ from ccpn.core.lib.SpectrumDataSources.Hdf5SpectrumDataSource import Hdf5SpectrumDataSource if not self.hasValidPath(): raise RuntimeError('Not valid path for %s ' % self) name = self.name + suffix if not path: path = self.dataSource.parentPath / name suffix = Hdf5SpectrumDataSource.suffixes[0] dataFormat = Hdf5SpectrumDataSource.dataFormat dataStore = DataStore.newFromPath(path=path, autoVersioning=True, withSuffix=suffix, dataFormat=dataFormat) # Duplicate the data in an Hdf5 file hdf5DataSource = self.dataSource.duplicateDataToHdf5(dataStore.aPath()) # Update the dataSource parameters from self # hdf5DataSource.importFromSpectrum(self, includePath=False) # hdf5DataSource.writeParameters() # Create a new Spectrum instance newSpectrum = _newSpectrumFromDataSource(project=self.project, dataStore=dataStore, dataSource=hdf5DataSource, name=name) # Copy the dimensional parameters self._copyDimensionalParameters(self.axisCodes, newSpectrum) # Copy/set some more parameters (e.g. noiseLevel) self._copyNonDimensionalParameters(newSpectrum) newSpectrum._updateParameterValues() # Copy the peakList/peaks for idx, pl in enumerate(self.peakLists): if idx+1 < len(newSpectrum.peakLists): newSpectrum.newPeakList() targetPl = newSpectrum.peakLists[idx] pl.copyTo(targetSpectrum=newSpectrum, targetPeakList=targetPl) newSpectrum.appendComment('Cloned from %s' % self.name) return newSpectrum
def _axisDictToSliceTuples(self, axisDict) -> list: """Convert dict of (key,value) = (axisCode, (startPpm, stopPpm)) pairs to a list of (startPoint,stopPoint) sliceTuples (1-based) for each dimension if (axisDict[axisCode] is None) ==> use spectrumLimits if (startPpm is None) ==> point=1 if (stopPpm is None) ==> point=pointCounts[axis] :param axisDict: dict of (axisCode, (startPpm,stopPpm)) (key,value) pairs :return list of (startPoint,stopPoint) sliceTuples (1-based) for each dimension """ axisCodes = [ac for ac in axisDict.keys()] # augment axisDict with any missing axisCodes or replace any None values with spectrumLimits inds = self.getByAxisCodes('dimensionIndices', axisCodes) for idx, ac in enumerate(self.axisCodes): if idx not in inds or axisDict[ac] is None: axisDict[ac] = self.spectrumLimits[idx] axisPpms = [ppm for ppm in axisDict.values()] sliceTuples = [None] * self.dimensionCount for dimIndex, ac, dim in self.dimensionTriples: idx = inds.index(dimIndex) minPpm = min(axisPpms[idx]) maxPpm = max(axisPpms[idx]) # ppm axis runs backward if maxPpm is None: startPoint = 1 else: startPoint = int(self.ppm2point(maxPpm, dimension=idx + 1) + 0.5) if minPpm is None: stopPoint = self.pointCounts[dimIndex] else: stopPoint = int(self.ppm2point(minPpm, dimension=idx + 1) + 0.5) sliceTuples[dimIndex] = tuple(sorted((startPoint, stopPoint))) getLogger().debug('Spectrum._axisDictToSliceTuples: axisDict = %s; sliceTuples = %s' % (axisDict, sliceTuples)) return sliceTuples
[docs] @logCommand(get='self') def getRegion(self, **axisDict) -> RegionData: """ Return the region of the spectrum data defined by the axis limits in ppm as a RegionData object, i.e. a numpy.ndarray of the same dimensionality as defined by the Spectrum instance. Axis limits are passed in as a dict of (axisCode, tupleLimit) key, value pairs with the tupleLimit supplied as (startPpm,stopPpm) axis limits in ppm (lower ppm value first). For axisCodes that are not included in the axisDict, the limits will by taken from the spectrum limits along the relevant axis For axisCodes that are None, the limits will by taken from the spectrum limits along the relevant axis. Illegal axisCodes will raise an error. Example axisDict: {'Hn': (7.0, 9.0), 'Nh': (110, 130)} Example calling function: regionData = spectrum.getRegion(**limitsDict) regionData = spectrum.getRegion(Hn=(7.0, 9.0), Nh=(110, 130)) :param axisDict: dict of (axisCode, (startPpm,stopPpm)) key,value pairs :return: RegionData object """ if not self.hasValidPath(): raise RuntimeError('Not valid path for %s ' % self) sliceTuples = self._axisDictToSliceTuples(axisDict) return self.dataSource.getRegionData(sliceTuples, aliasingFlags=[1] * self.dimensionCount)
[docs] @logCommand(get='self') def createPeak(self, peakList=None, height=None, **ppmPositions) -> Optional['Peak']: """Create and return peak at position specified by the ppmPositions dict. Ppm positions are passed in as a dict of (axisCode, ppmValue) key, value pairs with the ppmValue supplied mapping to the closest matching axis. Illegal or non-matching axisCodes will return None. Example ppmPosition dict: {'Hn': 7.0, 'Nh': 110} Example calling function: >>> peak = spectrum.createPeak(**ppmPositions) >>> peak = spectrum.createPeak(peakList, **ppmPositions) >>> peak = spectrum.createPeak(peakList=peakList, Hn=7.0, Nh=110) :param peakList: peakList to create new peak in, or None for the last peakList belonging to spectrum :param ppmPositions: dict of (axisCode, ppmValue) key,value pairs :return: new peak or None """ from ccpn.core.lib.SpectrumLib import _createPeak return _createPeak(self, peakList, height=height, **ppmPositions)
[docs] @logCommand(get='self') def pickPeaks(self, peakList=None, positiveThreshold=None, negativeThreshold=None, **ppmRegions) -> Tuple['Peak', ...]: """Pick peaks in the region defined by the ppmRegions dict. Ppm regions are passed in as a dict containing the axis codes and the required limits. Each limit is defined as a key, value pair: (str, tuple), with the tuple supplied as (min, max) axis limits in ppm. Axis codes supplied are mapped to the closest matching axis. Illegal or non-matching axisCodes will return None. Example ppmRegions dict: {'Hn': (7.0, 9.0), 'Nh': (110, 130)} Example calling function: >>> peaks = spectrum.pickPeaks(**ppmRegions) >>> peaks = spectrum.pickPeaks(peakList, **ppmRegions) >>> peaks = spectrum.pickPeaks(peakList=peakList, Hn=(7.0, 9.0), Nh=(110, 130)) :param peakList: peakList to create new peak in, or None for the last peakList belonging to spectrum :param positiveThreshold: float or None specifying the positive threshold above which to find peaks. if None, positive peak picking is disabled. :param negativeThreshold: float or None specifying the negative threshold below which to find peaks. if None, negative peak picking is disabled. :param ppmRegions: dict of (axisCode, tupleValue) key, value pairs :return: tuple of new Peak instances """ from ccpn.core.lib.SpectrumLib import _pickPeaksByRegion if peakList is None: peakList = self.peakLists[-1] # get the dimensions by mapping the keys of the ppmRegions dict _axisCodes = tuple([a for a in ppmRegions.keys()]) dimensions = self.getByAxisCodes('dimensions', _axisCodes) # now get all other parameters in dimensions order axisCodes = self.getByDimensions('axisCodes', dimensions) ppmValues = [sorted(float(pos) for pos in region) for region in ppmRegions.values()] ppmValues = self.orderByDimensions(ppmValues, dimensions) # now sorted in order of dimensions axisDict = dict((axisCode, region) for axisCode, region in zip(axisCodes, ppmValues)) sliceTuples = self._axisDictToSliceTuples(axisDict) return _pickPeaksByRegion(self, sliceTuples= sliceTuples, peakList=peakList, positiveThreshold=positiveThreshold, negativeThreshold=negativeThreshold)
def _extractToFile(self, axisCodes, position, path, dataFormat, tag): """Local helper routine to prevent code duplication across extractSliceToFile, extractPlaneToFile, extractProjectionToFile. :return: new Spectrum instance """ dimensions = self.getByAxisCodes('dimensions', axisCodes) dataFormats = getDataFormats() validFormats = [k.dataFormat for k in dataFormats.values() if k.hasWritingAbility] klass = dataFormats.get(dataFormat) if klass is None: raise ValueError('Invalid dataFormat %r; must be one of %s' % (dataFormat, validFormats)) if not klass.hasWritingAbility: raise ValueError('Unable to write to dataFormat %r; must be one of %s' % (dataFormat, validFormats)) suffix = klass.suffixes[0] if len(klass.suffixes) > 0 else '.dat' tagStr = '%s_%s' % (tag, '_'.join(axisCodes)) if path is None: appendToFilename = '_%s_%s' % (tagStr, '_'.join([str(p) for p in position])) path = self.dataSource.parentPath / self.name + appendToFilename dataStore = DataStore.newFromPath(path=path, autoVersioning=True, withSuffix=suffix, dataFormat=klass.dataFormat) newSpectrum = _extractRegionToFile(self, dimensions=dimensions, position=position, dataStore=dataStore) # add some comment as to the origin of the data comment = '%s at (%s) from %s' % (tagStr, ','.join([str(p) for p in position]), self) newSpectrum.appendComment(comment) return newSpectrum
[docs] @logCommand(get='self') def setPeakAliasing(self, peaks, aliasingIndexes, updateSpectrumAliasingIndexes=False): """Set the peak aliasing for a set of peaks in the spectrum Peaks is an iterable of type str of Peak - bad strings are ignored Core objects that are not of type Peak will raise error :param peaks: :param aliasingIndexes: tuple(int, int) :param updateSpectrumAliasingIndexes: True/False :return: """ # avoid circular import from ccpn.core.Peak import Peak if (pks := set(self.project.getByPid(pk) if isinstance(pk, str) else pk for pk in peaks) - {None}): for pk in pks: if not isinstance(pk, Peak): raise ValueError('Spectrum.setPeakAliasing: peaks must all be of type Peak') if not any(pk in pkList.peaks for pkList in self.peakLists): raise ValueError('Spectrum.setPeakAliasing: peaks must belong to one of spectrum.peakLists') if not isinstance(aliasingIndexes, (tuple, list)) and len(aliasingIndexes) == self.dimensionCount: raise ValueError(f'Spectrum.setPeakAliasing: aliasingIndexes must be tuple/list of length {self.dimensionCount}') if not all(-MAXALIASINGRANGE <= aa <= MAXALIASINGRANGE for aa in aliasingIndexes): raise ValueError(f'Spectrum.setPeakAliasing: aliasingIndexes must be in range [{-MAXALIASINGRANGE}, {MAXALIASINGRANGE}]') if not isinstance(updateSpectrumAliasingIndexes, bool): raise ValueError('Spectrum.setPeakAliasing: updateSpectrumAliasingIndexes must be True/False') if updateSpectrumAliasingIndexes: # update the aliasing limits for the spectrum aliasInds = self.aliasingIndexes folding = self.foldingLimits widths = self.spectralWidths newLimits = [(min(fold) + (min(*aa, newAl) * width), max(fold) + (max(*aa, newAl) * width)) for dim, (aa, fold, width, newAl) in enumerate(zip(aliasInds, folding, widths, aliasingIndexes))] self.aliasingLimits = newLimits for pk in pks: pk.aliasing = aliasingIndexes
#----------------------------------------------------------------------------------------- # Iterators #-----------------------------------------------------------------------------------------
[docs] def allPlanes(self, axisCodes: tuple, exactMatch:bool = True): """An iterator over all planes defined by axisCodes, yielding (positions, data-array) tuples. :param axisCodes: a tuple/list of two axisCodes defining the plane :param exactMatch: match the axisCodes if True :return: iterator (position, data-array); position is a (1-based) tuple of length dimensionCount """ if len(axisCodes) != 2: raise ValueError('Invalid axisCodes %s, len should be 2' % axisCodes) axisDims = self.getByAxisCodes('dimensions', axisCodes, exactMatch=exactMatch) # check and optionally expand axisCodes if axisDims[0] == axisDims[1]: raise ValueError('Invalid axisCodes %s; identical' % axisCodes) if not self.hasValidPath(): raise RuntimeError('Not valid path for %s ' % self) return self.dataSource.allPlanes(xDim=axisDims[0], yDim=axisDims[1])
[docs] def allSlices(self, axisCode:str, exactMatch:bool = True): """An iterator over all slices defined by axisCode, yielding (positions, data-array) tuples. :param axisCode: an axisCodes defining the slice :param exactMatch: match the axisCodes if True :return: iterator (position, data-array); position is a (1-based) tuple of length dimensionCount """ sliceDim = self.getByAxisCodes('dimensions', [axisCode], exactMatch=exactMatch)[0] if not self.hasValidPath(): raise RuntimeError('Not valid path for %s ' % self) return self.dataSource.allSlices(sliceDim=sliceDim)
[docs] def allPoints(self): """An iterator over all points yielding (positions, pointValue) tuples. :return: iterator (position, data-array); position is a (1-based) tuple of length dimensionCount """ if not self.hasValidPath(): raise RuntimeError('Not valid path for %s ' % self) return self.dataSource.allPoints()
#----------------------------------------------------------------------------------------- # Implementation properties and functions #----------------------------------------------------------------------------------------- @property def _serial(self) -> int: """Return the _wrappedData serial CCPN Internal """ return self._wrappedData.serial def _getDataSourceFromPath(self, path, dataFormat=None, checkParameters=True) -> tuple: """Return a (dataStore, dataSource) tuple if path points a file compatible with dataFormat, or (None, None) otherwise. :param path: path to check, may contain redirections :param dataFormat: dataFormat string (default to self.dataFormat if None) :param checkParameters: Flag to invoke checking of parameters of dataSource to match those of the self """ if dataFormat is None: dataFormat = self.dataFormat dataStore = DataStore.newFromPath(path=path, dataFormat=dataFormat) if not dataStore.exists(): return (None, None) dataStore.spectrum = self try: dataSource = self._getDataSource(dataStore, checkParameters=checkParameters) except RuntimeError as es: getLogger().debug('_getDataSourceFromPath: caught error: %s' % str(es)) return (None, None) return (dataStore, dataSource) def _getDataSource(self, dataStore, checkParameters=True): """Check the validity and access the file defined by dataStore; :param dataStore: A dataStore instance, defining a path, dataFormat and optional buffering :param checkParameters: flag to check essential parameters of the dataSource with the parameters of self (a Spectrum) :return SpectrumDataSource instance when path and/or dataFormat of the dataStore instance define something valid """ if dataStore is None: raise ValueError('dataStore is not defined') if not isinstance(dataStore, DataStore): raise ValueError(f'dataStore has invalid type; got: {dataStore}') if dataStore.dataFormat == EmptySpectrumDataSource.dataFormat: # Special case, empty spectrum dataSource = EmptySpectrumDataSource() dataSource.importFromSpectrum(self, includePath=False) checkParameters = False elif dataStore.dataFormat is None: # Special case, we do not (yet) know the dataFormat; this may occur due to Nef import # Attempt to get a dataSource from path alone _path = dataStore.aPath() if not _path.exists(): raise RuntimeError(f'Spectrum._getDataSource: dataStore path "{_path}" does not exist') if (dataSource := checkPathForSpectrumFormats(_path)) is None: raise RuntimeError(f'Spectrum._getDataSource: error for path "{_path}" with undefined dataFormat') # found a valid dataSource dataStore.dataFormat = dataSource.dataFormat dataStore._saveInternal() else: # Regular case: We have a dataStore with a path and dataFormat # Check the dataFormat validFormats = tuple(getDataFormats().keys()) if not isinstance(dataStore.dataFormat, str) or dataStore.dataFormat not in validFormats: raise ValueError('invalid dataFormat %r; should be one of %r' % (dataStore.dataFormat, validFormats)) _path = dataStore.aPath() if not _path.exists(): raise RuntimeError(f'Spectrum._getDataSource: dataStore path "{_path}" does not exist') if (dataSource := getSpectrumDataSource(_path, dataStore.dataFormat)) is None: raise RuntimeError(f'Spectrum._getDataSource: error for path "{_path}" with dataFormat "{dataStore.dataFormat}"') if dataStore.useBuffer: dataSource.setBuffering(isBuffered=True, bufferIsTemporary=True) if checkParameters: # check some fundamental parameters if dataSource.dimensionCount != self.dimensionCount: raise RuntimeError('Spectrum._getDataSource: incompatible dimensionCount (%s) of "%s"' % (dataSource.dimensionCount, dataStore.aPath())) for idx, np in enumerate(self.pointCounts): if dataSource.pointCounts[idx] != np: raise RuntimeError('Spectrum._getDataSource: incompatible pointsCount[%s] = %s of "%s"' % (idx, dataSource.pointCounts[idx], dataStore.aPath())) for isC_spectrum, isC_dataSource in zip(self.isComplex, dataSource.isComplex): if isC_spectrum != isC_dataSource: raise RuntimeError('Spectrum._getDataSource: incompatible isComplex definitions; %s has %r ; %s has %r' % (self, self.isComplex, dataSource, dataSource.isComplex)) dataSource.spectrum = self return dataSource def _getPeakPicker(self): """Check whether a peakPicker class has been saved with this spectrum. Returns new peakPicker instance or None if cannot be defined CCPNINTERNAL: also used in SpectrumTraits._restoreFromSpectrum """ from ccpn.core.lib.SpectrumLib import fetchPeakPicker if (peakPicker := fetchPeakPicker(self)) is not None: self.peakPicker = peakPicker return peakPicker def _updateParameterValues(self): """This method check, and if needed updates specific parameter values """ # Quietly set some values getLogger().debug2('Updating %s parameters' % self) with inactivity(): # getting the noiseLevel by calling estimateNoise() if not defined if self.noiseLevel is None: self.noiseLevel = self.estimateNoise() # Check contourLevels, contourColours if self.positiveContourCount == 0 or self.negativeContourCount == 0: self._setDefaultContourValues() self._setDefaultContourColours() if not self.sliceColour: self.sliceColour = self.positiveContourColour def _saveObject(self): """Update any setting before saving to API XML #CCPNINTERNAL: called in Project.save() """ # The is needed as nef-initiated loading may have scuppered the # references to the internal data self._spectrumTraits._storeToSpectrum() @classmethod def _restoreObject(cls, project, apiObj): """Subclassed to allow for initialisations on restore, not on creation via newSpectrum """ spectrum = super()._restoreObject(project, apiObj) # # # NOTE - version 3.0.4 -> 3.1.0 update was executed by the wrapper # # move parameters from _ccpnInternal to the correct namespace, delete old parameters # This will set all Spectrum traits, including dataStore, dataSource and peakPicker spectrum._spectrumTraits._restoreFromSpectrum() # Assure at least one peakList if len(spectrum.peakLists) == 0: spectrum.newPeakList() getLogger().warning('%s had no peakList; created one' % spectrum) # This will fix any spurious settings on the aliasing (also in update_3_0_4 code) _aIndices = spectrum.aliasingIndices spectrum.aliasingIndices = _aIndices # Assure a setting of crucial attributes spectrum._updateParameterValues() # save the spectrum metadata spectrum._saveSpectrumMetaData() # set the initial axis ordering specLib._getDefaultOrdering(spectrum) return spectrum
[docs] @renameObject() @logCommand(get='self') def rename(self, value: str): """Rename Spectrum, changing its name and Pid. """ self._deleteSpectrumMetaData() result = self._rename(value) self._saveSpectrumMetaData() return result
@property def _metaDataPath(self): """Return the path to the metadata file """ _tmpPath = self.project.statePath.fetchDir(self._pluralLinkName) return _tmpPath / self.name + '.json' def _saveSpectrumMetaData(self): """Save the spectrum metadata in the project/state/spectra in json file for optional future reference """ try: _path = self._metaDataPath except Exception: getLogger().warning(f'{self}: Unable to save metadata; undefined path') return if not _path.parent.exists(): getLogger().warning(f'{self}: Unable to save metadata to {_path.parent}') return self._spectrumTraits.save(_path) def _restoreFromSpectrumMetaData(self): """Retore the spectrum metadata from the project/state/spectra json file """ self._spectrumTraits.restore(self._metaDataPath) self._dataStore.spectrum = self self._dataStore._saveInternal() self.dataSource.spectrum = self def _deleteSpectrumMetaData(self): """Delete the spectrum metadata in the project/state/spectra """ _path = self._metaDataPath if _path.exists(): _path.removeFile() def _finaliseAction(self, action: str): """Subclassed to handle associated spectrumViews instances """ if not super()._finaliseAction(action): return if action == 'create': # No need; done by _newSpectrum # self._saveSpectrumMetaData() pass if action == 'delete': self._deleteSpectrumMetaData() # notify peak/integral/multiplet list if action in ['create', 'delete']: for peakList in self.peakLists: peakList._finaliseAction(action) for multipletList in self.multipletLists: multipletList._finaliseAction(action) for integralList in self.integralLists: integralList._finaliseAction(action) # propagate the rename to associated spectrumViews if action in ['change']: for specView in self.spectrumViews: if specView: if self._scaleChanged: # force a rebuild of the contours/etc. specView.buildContoursOnly = True specView._finaliseAction(action) if self._scaleChanged: self._scaleChanged = False # notify peaks/multiplets/integrals that the scale has changed for peakList in self.peakLists: for peak in peakList.peaks: peak._finaliseAction(action) for multipletList in self.multipletLists: for multiplet in multipletList.multiplets: multiplet._finaliseAction(action) for integralList in self.integralLists: for integral in integralList.integrals: integral._finaliseAction(action) # from ccpn.ui.gui.lib.OpenGL.CcpnOpenGL import GLNotifier # # GLSignals = GLNotifier(parent=self) # GLSignals.emitPaintEvent() @cached.clear(_REFERENCESUBSTANCESCACHE) def _clearCache(self): """Clear the cache """ if self.dataSource is not None: self.dataSource.clearCache() def _close(self): """Close any open dataSource CCPNINTERNAL: also called by Project.close() to do cleanup """ self._clearCache() if self.dataSource is not None: self._spectrumTraits.dataSource.closeFile() self._spectrumTraits.dataSource = None
[docs] @logCommand(get='self') def delete(self): """Delete Spectrum""" with undoBlock(): # self._deleteSpectrumMetaData() self._close() # self.deleteAllNotifiers() TODO: no longer required? # handle spectrumView ordering - this should be moved to spectrumView or spectrumDisplay via notifier? specDisplays = [] specViews = [] for sp in self.spectrumViews: if sp._parent.spectrumDisplay not in specDisplays: specDisplays.append(sp._parent.spectrumDisplay) specViews.append((sp._parent, sp._parent.spectrumViews.index(sp))) listsToDelete = tuple(self.peakLists) listsToDelete += tuple(self.integralLists) listsToDelete += tuple(self.multipletLists) # delete the connected lists, should undo in the correct order for obj in listsToDelete: obj._delete() with undoStackBlocking() as addUndoItem: # notify spectrumViews of delete/create addUndoItem(undo=partial(self._notifySpectrumViews, 'create'), redo=partial(self._notifySpectrumViews, 'delete')) # delete the _wrappedData self._delete()
# with undoStackBlocking() as addUndoItem: # # notify spectrumViews of delete # addUndoItem(redo=self._finaliseSpectrumViews, '') # for sd in specViews: # sd[0]._removeOrderedSpectrumViewIndex(sd[1]) def _deleteChild(self, child): """Delete child object child is Pid or V3 object If child exists and is a valid child then delete otherwise log a warning """ child = self.project.getByPid(child) if isinstance(child, str) else child if child and child in self._getChildrenByClass(child): # only delete objects that are valid children - calls private _delete # so now infinite loop with baseclass delete child._delete() elif child: getLogger().warning(f'{child} not deleted - not child of {self}') else: getLogger().warning(f'{child} not deleted') def _deletePeakList(self, child): """Delete child object and ensure that there always exists at least one peakList """ with undoBlock(): self._deleteChild(child) if not self.peakLists: # if there are no peakLists then create another (there must always be one) self.newPeakList() #----------------------------------------------------------------------------------------- # CCPN properties and functions #----------------------------------------------------------------------------------------- def _resetPeakLists(self): """Delete autocreated peaklists and reset CCPN Internal - not for general use required by nef importer """ for peakList in list(self.peakLists): # overrides spectrum contains at least one peakList self._deleteChild(peakList) self._wrappedData.__dict__['_serialDict']['peakLists'] = 0 @property def _apiDataSource(self) -> Nmr.DataSource: """ CCPN DataSource matching Spectrum""" return self._wrappedData @property def _key(self) -> str: """name, regularised as used for id""" return self._wrappedData.name.translate(Pid.remapSeparators) @property def _localCcpnSortKey(self) -> Tuple: """Local sorting key, in context of parent. """ return (self._wrappedData.experiment.serial, self._wrappedData.serial) @classmethod def _getAllWrappedData(cls, parent: Project) -> list: """get wrappedData (Nmr.DataSources) for all Spectrum children of parent Project""" return list(x for y in parent._wrappedData.sortedExperiments() for x in y.sortedDataSources()) def _notifySpectrumViews(self, action): for sv in self.spectrumViews: sv._finaliseAction(action) #----------------------------------------------------------------------------------------- # new'Object' and other methods # Call appropriate routines in their respective locations #-----------------------------------------------------------------------------------------
[docs] @logCommand(get='self') def newPeakList(self, title: str = None, comment: str = None, isSimulated: bool = False, symbolStyle: str = None, symbolColour: str = None, textColour: str = None, **kwds): """Create new empty PeakList within Spectrum See the PeakList class for details. Optional keyword arguments can be passed in; see PeakList._newPeakList for details. :param title: :param comment: :param isSimulated: :param symbolStyle: :param symbolColour: :param textColour: :return: a new PeakList attached to the spectrum. """ from ccpn.core.PeakList import _newPeakList return _newPeakList(self, title=title, comment=comment, isSimulated=isSimulated, symbolStyle=symbolStyle, symbolColour=symbolColour, textColour=textColour, **kwds)
[docs] @logCommand(get='self') def newIntegralList(self, title: str = None, symbolColour: str = None, textColour: str = None, comment: str = None, **kwds): """Create new IntegralList within Spectrum. See the IntegralList class for details. Optional keyword arguments can be passed in; see IntegralList._newIntegralList for details. :param self: :param title: :param symbolColour: :param textColour: :param comment: :return: a new IntegralList attached to the spectrum. """ from ccpn.core.IntegralList import _newIntegralList return _newIntegralList(self, title=title, comment=comment, symbolColour=symbolColour, textColour=textColour, **kwds)
[docs] @logCommand(get='self') def newMultipletList(self, title: str = None, symbolColour: str = None, textColour: str = None, lineColour: str = None, multipletAveraging=None, comment: str = None, multiplets: Sequence[Union['Multiplet', str]] = None, **kwds): """Create new MultipletList within Spectrum. See the MultipletList class for details. Optional keyword arguments can be passed in; see MultipletList._newMultipletList for details. :param title: title string :param symbolColour: :param textColour: :param lineColour: :param multipletAveraging: :param comment: optional comment string :param multiplets: optional list of multiplets as objects or pids :return: a new MultipletList attached to the Spectrum. """ from ccpn.core.MultipletList import _newMultipletList return _newMultipletList(self, title=title, comment=comment, lineColour=lineColour, symbolColour=symbolColour, textColour=textColour, multipletAveraging=multipletAveraging, multiplets=multiplets, **kwds)
[docs] @logCommand(get='self') def newSpectrumHit(self, substanceName: str, pointNumber: int = 0, pseudoDimensionNumber: int = 0, pseudoDimension=None, figureOfMerit: float = None, meritCode: str = None, normalisedChange: float = None, isConfirmed: bool = None, concentration: float = None, concentrationError: float = None, concentrationUnit: str = None, comment: str = None, **kwds): """Create new SpectrumHit within Spectrum. See the SpectrumHit class for details. Optional keyword arguments can be passed in; see SpectrumHit._newSpectrumHit for details. :param substanceName: :param pointNumber: :param pseudoDimensionNumber: :param pseudoDimension: :param figureOfMerit: :param meritCode: :param normalisedChange: :param isConfirmed: :param concentration: :param concentrationError: :param concentrationUnit: :param comment: optional comment string :return: a new SpectrumHit instance. """ from ccpn.core.SpectrumHit import _newSpectrumHit return _newSpectrumHit(self, substanceName=substanceName, pointNumber=pointNumber, pseudoDimensionNumber=pseudoDimensionNumber, pseudoDimension=pseudoDimension, figureOfMerit=figureOfMerit, meritCode=meritCode, normalisedChange=normalisedChange, isConfirmed=isConfirmed, concentration=concentration, concentrationError=concentrationError, concentrationUnit=concentrationUnit, comment=comment, **kwds)
#----------------------------------------------------------------------------------------- # Output, printing, etc #----------------------------------------------------------------------------------------- def __str__(self): return '<%s (%s); %dD (%s)>' % (self.pid, self.dataFormat, self.dimensionCount, ','.join(self.axisCodes)) def _infoString(self, includeDimensions=False): """Return info string about self, optionally including dimensional parameters """ string = '================= %s =================\n' % self string += 'path = %s\n' % self.filePath string += 'dataFormat = %s\n' % self.dataFormat string += 'dimensions = %s\n' % self.dimensionCount string += 'sizes = (%s)\n' % ' x '.join([str(d) for d in self.pointCounts]) for attr in """ scale noiseLevel experimentName """.split(): value = getattr(self, attr) string += '%s = %s\n' % (attr, value) if includeDimensions: string += '\n' for attr in """ dimensions axisCodes pointCounts isComplex dimensionTypes isotopeCodes measurementTypes spectralWidths spectralWidthsHz spectrometerFrequencies referencePoints referenceValues axisUnits foldingModes windowFunctions lorentzianBroadenings gaussianBroadenings phases0 phases1 assignmentTolerances """.split(): values = getattr(self, attr) string += '%-25s: %s\n' % (attr, ' '.join(['%-20s' % str(v) for v in values]) ) return string
[docs] def printParameters(self, includeDimensions=True): """Print the info string """ print(self._infoString(includeDimensions))
#========================================================================================= # New and empty spectra #========================================================================================= # Hack; remove the api notifier on create # _notifiers = [nf for nf in Project._apiNotifiers if nf[3] == '__init__' and 'cls' in nf[1] and nf[1]['cls'] == Spectrum] # if len(_notifiers) == 1: # Project._apiNotifiers.remove(_notifiers[0]) @newObject(Spectrum) def _newSpectrumFromDataSource(project, dataStore, dataSource, name=None) -> Spectrum: """Create a new Spectrum instance with name using the data in dataStore and dataSource :returns Spectrum instance or None on error """ from ccpn.core.SpectrumReference import _newSpectrumReference if dataStore is None: raise ValueError('dataStore cannot be None') if dataSource is None: raise ValueError('dataSource cannot be None') if dataSource.dimensionCount == 0: raise ValueError('dataSource.dimensionCount = 0') if name is None: name = dataSource.nameFromPath() # assure unique name name = Spectrum._uniqueName(project=project, name=name) getLogger().debug('Creating Spectrum %r (%dD;%s); %s' % ( name, dataSource.dimensionCount, 'x'.join([str(p) for p in dataSource.pointCounts]), dataStore ) ) apiProject = project._wrappedData apiExperiment = apiProject.newExperiment(name=name, numDim=dataSource.dimensionCount) apiDataSource = apiExperiment.newDataSource(name=name, dataStore=None, numDim=dataSource.dimensionCount, dataType='processed' ) # Done with api generation; Create the Spectrum object # Agggh, cannot do # spectrum = Spectrum(self, apiDataSource) # as the object was magically already created # This was done by Project._newApiObject, called from Nmr.DataSource.__init__ through an api notifier. # This notifier is set in the AbstractWrapper class and is part of the machinery generation; i.e. # _linkWrapperClasses (needs overhaul!!) spectrum = project._data2Obj.get(apiDataSource) if spectrum is None: raise RuntimeError("something went wrong creating a new Spectrum instance") spectrum._apiExperiment = apiExperiment # initialise the dimensional SpectrumReference objects with inactivity(): for dim in dataSource.dimensions: _newSpectrumReference(spectrum, dimension=dim, dataSource=dataSource) # Set the references between spectrum and dataStore dataStore.dataFormat = dataSource.dataFormat dataStore.spectrum = spectrum dataStore._saveInternal() spectrum._spectrumTraits.dataStore = dataStore # update dataSource with proper expanded path dataSource.setPath(dataStore.aPath()) # Update all parameters from the dataSource to the Spectrum instance; retain the dataSource instance dataSource.exportToSpectrum(spectrum, includePath=False) spectrum._spectrumTraits.dataSource = dataSource # get a peak picker spectrum._getPeakPicker() # Quietly update some essentials with inactivity(): # Link to default (i.e. first) chemicalShiftList spectrum.chemicalShiftList = project.chemicalShiftLists[0] # Assure at least one peakList if len(spectrum.peakLists) == 0: spectrum.newPeakList() # Hack to trigger initialisation of contours spectrum.positiveContourCount = 0 spectrum.negativeContourCount = 0 spectrum._updateParameterValues() spectrum._saveSpectrumMetaData() spectrum._setDefaultAxisOrdering() return spectrum def _newEmptySpectrum(project: Project, isotopeCodes: Sequence[str], dimensionCount=None, name: str = 'emptySpectrum', path=None, **parameters) -> Spectrum: """Creation of new Empty Spectrum; :return: Spectrum instance or None on error """ if not isIterable(isotopeCodes) or len(isotopeCodes) == 0: raise ValueError('invalid isotopeCodes "%s"' % isotopeCodes) if path is None: dataStore = DataStore() else: dataStore = DataStore.newFromPath(path=path, dataFormat=EmptySpectrumDataSource.dataFormat) # Initialise a dataSource instance dataSource = EmptySpectrumDataSource() if dataSource is None: raise RuntimeError('Error creating empty DataSource') # Fill in some of the parameters if dimensionCount is None: dimensionCount = len(isotopeCodes) dataSource.dimensionCount = dimensionCount dataSource.isotopeCodes = isotopeCodes dataSource._setSpectralParametersFromIsotopeCodes() dataSource._assureProperDimensionality() dataSource.noiseLevel = 0.0 spectrum = _newSpectrumFromDataSource(project, dataStore, dataSource, name) # Optionally update Spectrum with optional parameters and copy back to dataSource instance; # this allows for example to set the size if len(parameters) > 0: for param, value in parameters.items(): if hasattr(spectrum, param): try: setattr(spectrum, param, value) except AttributeError: getLogger().warning(f'{spectrum}: can not set parameter "{param}" to {value}') dataSource.importFromSpectrum(spectrum, includePath=False) return spectrum def _newHdf5Spectrum(project: Project, isotopeCodes: Sequence[str], name: str = 'hdf5Spectrum', path=None, **parameters) -> Spectrum: """Creation of new hdf5 Spectrum (without data) at path (autogenerated temporary path when None); :return: Spectrum instance or None on error """ if not isIterable(isotopeCodes) or len(isotopeCodes) == 0: raise ValueError('invalid isotopeCodes "%s"' % isotopeCodes) if path is None: path = Path('$INSIDE') / CCPN_SPECTRA_DIRECTORY / name path = path.assureSuffix(Hdf5SpectrumDataSource.suffixes[0]) dataStore = DataStore.newFromPath(path, dataFormat=Hdf5SpectrumDataSource.dataFormat, autoVersioning=True) # Initialise a Hdf5 dataSource instance dataSource = Hdf5SpectrumDataSource() if dataSource is None: raise RuntimeError('Error creating Hdf5 DataSource') # Fill in some of the parameters dataSource.dimensionCount = len(isotopeCodes) dataSource.isotopeCodes = isotopeCodes dataSource._setSpectralParametersFromIsotopeCodes() dataSource._assureProperDimensionality() # Optionally update dataSource with parameters; e.g. to set the pointCounts, spectral parameters, etc if len(parameters) > 0: for param, value in parameters.items(): if hasattr(dataSource, param): setattr(dataSource, param, value) # Create the file with dataSource.openNewFile(path=dataStore.aPath()) as hdf5File: hdf5File.writeParameters() # create a Spectrum instance spectrum = _newSpectrumFromDataSource(project, dataStore, dataSource, name) return spectrum def _newSpectrum(project: Project, path: (str, Path), name: str = None) -> (Spectrum, None): """Creation of new Spectrum instance from path; :return: Spectrum instance or None on error """ logger = getLogger() dataStore = DataStore.newFromPath(path) if not dataStore.exists(): dataStore.errorMessage() return None _path = dataStore.aPath() # Try to determine data format from the path and intialise a dataSource instance with parsed parameters dataSource = checkPathForSpectrumFormats(path=_path) if dataSource is None: logger.warning('Invalid spectrum path "%s"' % path) # report the argument given return None dataSource.estimateNoise() spectrum = _newSpectrumFromDataSource(project, dataStore, dataSource, name) return spectrum def _extractRegionToFile(spectrum, dimensions, position, dataStore, name=None) -> Spectrum: """Extract a region of spectrum, defined by dimensions, position to path defined by dataStore (optionally auto-generated from spectrum.path) :spectrum: the spectrum from which a regions is to be extracted :param dimensions: [dim_a, dim_b, ...]; defining dimensions to be extracted (1-based) :param position, spectrum position-vector of length spectrum.dimensionCount (list, tuple; 1-based) :param dataStore: a DataStore instance, defining the path and dataFormat of the new spectrum :param name: optional name for the new spectrum :return: a Spectrum instance """ getLogger().debug('Extracting from %s, dimensions=%r, position=%r, dataStore=%s, dataFormat %r' % (spectrum, dimensions, position, dataStore, dataStore.dataFormat)) if spectrum is None or not isinstance(spectrum, Spectrum): raise ValueError('invalid spectrum argument %r' % spectrum) if spectrum.dataSource is None: raise RuntimeError('No proper (filePath, dataFormat) set for %s' % spectrum) for dim in dimensions: if dim < 1 or dim > spectrum.dimensionCount: raise ValueError('Invalid dimnsion %r in dimensions argument (%s)' % (dim, dimensions)) dimensions = list(dimensions) # assure a list position = spectrum.dataSource.checkForValidPosition(position) if dataStore is None: raise ValueError('Undefined dataStore') if dataStore.dataFormat is None: raise ValueError('Undefined dataStore.dataFormat') klass = getDataFormats().get(dataStore.dataFormat) if klass is None: raise ValueError('Invalid dataStore.dataFormat %r' % dataStore.dataFormat) # Create a dataSource object; import spectrum to initialise the dataSource values dataSource = klass().importFromSpectrum(spectrum=spectrum, includePath=False) disgardedDimensions = list(set(spectrum.dimensions) - set(dimensions)) # The dimensional parameters of spectrum were copied on initialisation # remap the N-axes (as defined by the N items in the dimensions argument) onto the first N axes # of the dataSource; dimensionsMap = dict(zip(dimensions + disgardedDimensions, dataSource.dimensions)) dataSource._mapDimensionalParameters(dimensionsMap=dimensionsMap) # Now reduce the dimensionality to the length of dimensions; i.e. the dimensionality of the # new spectrum dataSource.setDimensionCount(len(dimensions)) # copy the data # indexMap = dict((k - 1, v - 1) for k, v in dimensionsMap.items()) # source -> destination inverseIndexMap = dict((v - 1, k - 1) for k, v, in dimensionsMap.items()) # destination -> source readSliceDim = dimensions[0] # first retained dimension from the original data writeSliceDim = 1 # which was mapped to the first dimension of the new data sliceTuples = [(p, p) for p in position] # assure full range for all dimensions, other than readSliceDim for dim in dimensions[1:]: sliceTuples[dim - 1] = (1, spectrum.pointCounts[dim - 1]) with spectrum.dataSource.openExistingFile() as inputFile: with dataSource.openNewFile(path=dataStore.aPath()) as output: # loop over all requested slices for position, aliased in inputFile._selectedPointsIterator(sliceTuples=sliceTuples, excludeDimensions=[readSliceDim]): data = inputFile.getSliceData(position, readSliceDim) # map the input position to the output position and write the data outPosition = [position[inverseIndexMap[p]] for p in output.dimensionIndices] # print('>>>', position, outPosition) output.setSliceData(data, outPosition, writeSliceDim) # create the new Spectrum instance from the dataSource newSpectrum = _newSpectrumFromDataSource(project=spectrum.project, dataStore=dataStore, dataSource=dataSource, name=name) # copy/set some more parameters (e.g. noiseLevel) spectrum._copyNonDimensionalParameters(newSpectrum) newSpectrum._updateParameterValues() return newSpectrum # # # Additional Notifiers: # Project._apiNotifiers.extend( # ( # # GWV: not needed? # # ('_finaliseApiRename', {}, Nmr.DataSource._metaclass.qualifiedName(), 'setName'), # # # ('_notifyRelatedApiObject', {'pathToObject': 'dataSources', 'action': 'change'}, # Nmr.Experiment._metaclass.qualifiedName(), ''), # # ('_notifyRelatedApiObject', {'pathToObject': 'dataSource', 'action': 'change'}, # Nmr.AbstractDataDim._metaclass.qualifiedName(), ''), # # ('_notifyRelatedApiObject', {'pathToObject': 'dataDim.dataSource', 'action': 'change'}, # Nmr.DataDimRef._metaclass.qualifiedName(), ''), # # ('_notifyRelatedApiObject', {'pathToObject': 'experiment.dataSources', 'action': 'change'}, # Nmr.ExpDim._metaclass.qualifiedName(), ''), # # ('_notifyRelatedApiObject', {'pathToObject': 'expDim.experiment.dataSources', 'action': 'change'}, # Nmr.ExpDimRef._metaclass.qualifiedName(), ''), # # ('_notifyRelatedApiObject', {'pathToObject': 'nmrDataSources', 'action': 'change'}, # DataLocation.AbstractDataStore._metaclass.qualifiedName(), ''), # # ) # )