"""Spectrum-related definitions, functions and utilities
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
"Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See https://ccpn.ac.uk/software/licensing/")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
"CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
"J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2022-02-15 16:33:12 +0000 (Tue, February 15, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2017-04-07 10:28:41 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================
import collections
import math
import random
import numpy as np
import decorator
from typing import Tuple, Optional, Sequence, List
from ccpn.framework.Application import getApplication
from ccpn.core.lib.ContextManagers import notificationEchoBlocking, undoBlockWithoutSideBar
from ccpn.core.lib.AxisCodeLib import getAxisCodeMatchIndices
from ccpn.core.lib._DistanceRestraintsLib import _getBoundResonances, longRangeTransfers
from ccpn.util.Common import percentage, isIterable
from ccpn.util.Logging import getLogger
from ccpn.util.decorators import singleton
#=========================================================================================
# Dimension definitions
# Defined here to prevent cyclic import problems in other modules that need access to these definitions
#=========================================================================================
DIMENSION_TIME = 'Time'
DIMENSION_FREQUENCY = 'Frequency'
DIMENSION_SAMPLED = 'Sampled'
DIMENSIONTYPES = [DIMENSION_TIME, DIMENSION_FREQUENCY, DIMENSION_SAMPLED]
DIMENSIONFREQ = 'Freq' # GWV: not sure why this is needed, used in the model??
MAXDIM = 8 # Maximum dimensionality
X_AXIS = 0
Y_AXIS = 1
Z_AXIS = 2
A_AXIS = 3
B_AXIS = 4
C_AXIS = 5
D_AXIS = 6
E_AXIS = 7
UNDEFINED_AXIS = 8
axisNames = {X_AXIS : "x-axis", Y_AXIS: "y-axis", Z_AXIS: "z-axis", A_AXIS: "a-axis",
B_AXIS : "b-axis", C_AXIS: "c-axis", D_AXIS: "d-axis", E_AXIS: "e-axis",
UNDEFINED_AXIS: "undefined-axis"
}
INTENSITY_DIM = 0
X_DIM = 1
Y_DIM = 2
Z_DIM = 3
A_DIM = 4
B_DIM = 5
C_DIM = 6
D_DIM = 7
E_DIM = 8
UNDEFINED_DIM = 9
dimensionNames = {INTENSITY_DIM: "intensity",
X_DIM : "x-dimension", Y_DIM: "y-dimension", Z_DIM: "z-dimension", A_DIM: "a-dimension",
B_DIM : "b-dimension", C_DIM: "c-dimension", D_DIM: "d-dimension", E_DIM: "e-dimension",
UNDEFINED_DIM: "undefined-dimension"
}
X_DIM_INDEX = 0
Y_DIM_INDEX = 1
Z_DIM_INDEX = 2
A_DIM_INDEX = 3
B_DIM_INDEX = 4
C_DIM_INDEX = 5
D_DIM_INDEX = 6
E_DIM_INDEX = 7
UNDEFINED_DIM_INDEX = 8
MagnetisationTransferTuple = collections.namedtuple('MagnetisationTransferTuple', 'dimension1 dimension2 transferType isIndirect')
NoiseEstimateTuple = collections.namedtuple('NoiseEstimateTuple', 'mean std min max noiseLevel')
FOLDING_MODE_CIRCULAR = 'circular'
FOLDING_MODE_MIRROR = 'mirror'
FOLDING_MODES = (FOLDING_MODE_CIRCULAR, FOLDING_MODE_MIRROR)
WINDOW_FUNCTION_EM = 'EM'
WINDOW_FUNCTION_GM = 'GM'
WINDOW_FUNCTION_SINE = 'Sine'
WINDOW_FUNCTION_QSINE = 'squaredSine'
WINDOW_FUNCTIONS = (WINDOW_FUNCTION_EM, WINDOW_FUNCTION_GM, WINDOW_FUNCTION_SINE, WINDOW_FUNCTION_QSINE)
# These MUST match the model - ('Shift','ShiftAnisotropy','JCoupling','Rdc','TROESY','DipolarCoupling','MQShift','T1','T2','T1rho','T1zz','Time','None')
MEASUREMENT_TYPE_TIME = 'Time'
MEASUREMENT_TYPE_SHIFT = 'Shift'
MEASUREMENT_TYPES = (MEASUREMENT_TYPE_TIME, MEASUREMENT_TYPE_SHIFT, 'ShiftAnisotropy', 'JCoupling', 'Rdc', 'TROESY', 'DipolarCoupling',
'MQShift', 'T1', 'T2', 'T1rho', 'T1zz')
# Isotope-dependent assignment tolerances (in ppm)
defaultAssignmentTolerance = 0.4
isotope2Tolerance = {
'1H' : 0.03,
'13C': 0.4,
'15N': 0.4,
'19F': 0.03,
}
SPECTRUM_POSITIONS = 'positions'
SPECTRUM_INTENSITIES = 'intensities'
MAXALIASINGRANGE = 3
[docs]def getAssignmentTolerances(isotopeCode) -> float:
""":return assignmentTolerance for isotopeCode or defaultAssignment tolerance if not defined
"""
return isotope2Tolerance.get(isotopeCode, defaultAssignmentTolerance)
#=========================================================================================
# Decorators for Spectrum attributes
#=========================================================================================
@singleton
class _includeInCopyList(list):
"""Singleton class to store the attributes to be included when making a copy of object.
Attributes can be modified and can be either non-dimensional or dimension dependent,
Dynamically filled by two decorators
Stored as list of (attributeName, isMultiDimensional) tuples
"""
def getNoneDimensional(self):
"""return a list of one-dimensional attribute names"""
return [attr for attr, isNd in self if isNd == False]
def getMultiDimensional(self):
"""return a list of one-dimensional attribute names"""
return [attr for attr, isNd in self if isNd == True]
def appendItem(self, attribute, isMultidimensional):
_t = (attribute, isMultidimensional)
if _t not in self:
super().append(_t)
def _includeInCopy(func):
"""Decorator to define that an non-dimensional attribute is to be included when making a copy of object
"""
storage = _includeInCopyList()
storage.appendItem(func.__name__, isMultidimensional=False)
return func
def _includeInDimensionalCopy(func):
"""Decorator to define that a dimensional attribute is to be included when making a copy of object
"""
storage = _includeInCopyList()
storage.appendItem(func.__name__, isMultidimensional=True)
return func
[docs]def checkSpectrumPropertyValue(iterable: bool, unique: bool = False, allowNone: bool = False, types: tuple = (),
enumerated: tuple = (), mapping=None):
"""Decorator to check values of the Spectrum class property setters
:param iterable: True, False: indicates that value should be an iterable
:param unique: True, False: indicates if iterable items should be unique
:param allowNone: True, False indicates if None value is allowed
:param types: a tuple of allowed types for value; value is cast into first type
:param enumerated: a tuple/list indicating that value should be one of the items of the tuple
:param mapping: an optional (originalValue, mappedValue) mapping dict; applied to
the value or values-items
"""
if allowNone:
types = tuple(list(types) + [type(None)])
if len(enumerated) > 0:
enumerated = list(enumerated) + [None]
if mapping is None:
mapping = {}
def checkType(obj, attributeName, value):
"""Check and optional casts value
:param obj: the object checked
:param attributeName: the attribute of object
:param value:
:return: value cast into types[0]
"""
checkedValue = value
if not allowNone and value is None:
raise ValueError('Value for "%s" of %s cannot be None)' %
(attributeName, obj))
if len(types) > 0:
if not isinstance(value, types):
typeNames = tuple(t.__name__ for t in types)
raise ValueError('Value for "%s" of %s needs to be of type %s; got %r (type %s)' %
(attributeName, obj, typeNames, value, type(value).__name__))
# cast into types[0]
if value is not None:
checkedValue = types[0](value)
return checkedValue
def checkIterable(obj, attributeName, value):
"""Check for iterable properties
"""
if not isinstance(value, (list, tuple, set)):
raise ValueError('Value for "%s" of %s needs to be one of (list, tuple, set); got %r (type %s)' %
(attributeName, obj, value, type(value).__name__))
if len(value) != obj.dimensionCount:
raise ValueError('Value for "%s" of %s needs to be an iterable of length %d; got %r' %
(attributeName, obj, obj.dimensionCount, value))
if unique:
vals = [val for val in value if val is not None]
if len(vals) != len(set(vals)):
raise ValueError('The items of "%s" of %s need to be unique; got %r' %
(attributeName, obj, value))
def checkEnumerate(obj, attributeName, value):
"""Check if values needs to be in enumerate
"""
if len(enumerated) > 0 and value not in enumerated:
raise ValueError('Value for "%s" of %s needs to be of %r; got %r' %
(attributeName, obj, enumerated, value))
return value
@decorator.decorator
def theDecorator(*args, **kwds):
func = args[0]
self = args[1]
value = args[2]
# if func.__name__ == 'measurementTypes':
# print('>>>', isIterable, types, enumerated)
if iterable:
checkIterable(self, func.__name__, value)
# check the individual elements
checkedValue = []
for idx, val in enumerate(value):
if len(mapping) > 0:
val = mapping.get(val, val)
_itemName = '%s[%d]' % (func.__name__, idx)
val = checkType(self, _itemName, val)
val = checkEnumerate(self, _itemName, val)
checkedValue.append(val)
else:
if len(mapping) > 0:
value = mapping.get(value, value)
checkedValue = checkType(self, func.__name__, value)
checkedValue = checkEnumerate(self, func.__name__, checkedValue)
return func(self, checkedValue)
return theDecorator
#=========================================================================================
# def _oldEstimateNoiseLevel1D(x, y, factor=3):
# """
# :param x,y: spectrum.positions, spectrum.intensities
# :param factor: optional. Increase factor to increase the STD and therefore the noise level threshold
# :return: float of estimated noise threshold
# """
#
# data = np.array([x, y])
# dataStd = np.std(data)
# data = np.array(data, np.float32)
# data = data.clip(-dataStd, dataStd)
# value = factor * np.std(data)
# return value
def _oldEstimateNoiseLevel1D(y, factor=0.5):
"""
Estimates the noise threshold based on the max intensity of the first portion of the spectrum where
only noise is present. To increase the threshold value: increase the factor.
return: float of estimated noise threshold
"""
if y is not None:
# print('_oldEstimateNoiseLevel1D',max(y[:int(len(y)/20)]) * factor, 'STD, ')
return max(y[:int(len(y) / 20)]) * factor
else:
return 0
def _calibrateX1D(spectrum, currentPosition, newPosition):
shift = newPosition - currentPosition
spectrum.referenceValues = [spectrum.referenceValues[0] + shift]
spectrum.positions = spectrum.positions + shift
def _calibrateY1D(spectrum, currentPosition, newPosition):
shift = newPosition - currentPosition
spectrum.intensities = spectrum.intensities + shift
def _calibrateXND(spectrum, strip, currentPosition, newPosition):
# map the X change to the correct spectrum axis
spectrumReferencing = list(spectrum.referenceValues)
indices = getAxisCodeMatchIndices(strip.axisCodes, spectrum.axisCodes)
# as modifying the spectrum, spectrum needs to be the second argument of getAxisCodeMatchIndices
spectrumReferencing[indices[0]] = float(spectrumReferencing[indices[0]] + (newPosition - currentPosition))
spectrum.referenceValues = spectrumReferencing
def _calibrateNDAxis(spectrum, axisIndex, currentPosition, newPosition):
# map the X change to the correct spectrum axis
spectrumReferencing = list(spectrum.referenceValues)
# as modifying the spectrum, spectrum needs to be the second argument of getAxisCodeMatchIndices
spectrumReferencing[axisIndex] = float(spectrumReferencing[axisIndex] + (newPosition - currentPosition))
spectrum.referenceValues = spectrumReferencing
def _calibrateYND(spectrum, strip, currentPosition, newPosition):
# map the Y change to the correct spectrum axis
spectrumReferencing = list(spectrum.referenceValues)
indices = getAxisCodeMatchIndices(strip.axisCodes, spectrum.axisCodes)
# as modifying the spectrum, spectrum needs to be the second argument of getAxisCodeMatchIndices
spectrumReferencing[indices[1]] = float(spectrumReferencing[indices[1]] + (newPosition - currentPosition))
spectrum.referenceValues = spectrumReferencing
def _set1DRawDataFromCcpnInternal(spectrum):
_positions = spectrum._getInternalParameter(SPECTRUM_POSITIONS)
_intensities = spectrum._getInternalParameter(SPECTRUM_INTENSITIES)
if not (_positions or _intensities):
return
spectrum.positions = np.array(_positions)
spectrum.intensities = np.array(_intensities)
def _negLogLikelihood(deltas, queryPeakPositions, kde):
shifted = queryPeakPositions - deltas
return -kde.logpdf(shifted.T)
[docs]def align2HSQCs(refSpectrum, querySpectrum, refPeakListIdx=-1, queryPeakListIdx=-1):
# Get hold of the peakLists in the two spectra
queryPeakList = querySpectrum.peakLists[queryPeakListIdx]
refPeakList = refSpectrum.peakLists[refPeakListIdx]
# Create numpy arrays containing the peak positions of
# each peakList
refPeakPositions = np.array([peak.position for peak in refPeakList.peaks])
queryPeakPositions = np.array([peak.position for peak in queryPeakList.peaks])
# Align the two numpy arrays by centre of mass
refMean = np.mean(refPeakPositions, axis=0)
queryMean = np.mean(queryPeakPositions, axis=0)
roughShift = queryMean - refMean
shiftedQueryPeakPositions = queryPeakPositions - roughShift
# Define a log-likelihood target for fitting the query
# peak positions
from scipy.optimize import leastsq
from scipy.stats import gaussian_kde
# Create the Gaussian KDE
kde = gaussian_kde(refPeakPositions.T, bw_method=0.1)
# Get hold of the values to overlay the two spectra
shifts, status = leastsq(_negLogLikelihood, roughShift,
args=(queryPeakPositions, kde))
# Get hold of the reference values of the querySpectrum
queryRefValues = queryPeakList.spectrum.referenceValues
# Calculate the corrected reference values
correctedValues = np.array(queryRefValues) - shifts
return shifts, correctedValues
def _estimate1DSpectrumSNR(spectrum, engine='max'):
"""
:param spectrum:
:type spectrum:
:param engine: max: calculate using the max intensity of all spectrum
:type engine:
:return:
:rtype:
"""
engines = {'max': np.max, 'mean': np.mean, 'std': np.std}
if engine in engines:
func = engines.get(engine)
else:
func = np.max
print('Engine not recognised. Using Default')
_snr = estimateSNR(noiseLevels=[spectrum.noiseLevel, spectrum.negativeNoiseLevel],
signalPoints=[func(spectrum.intensities)])
return _snr[0]
# refSpectrum = project.spectra[]
# querySpectrum = project.spectra[]
# a = align2HSQCs(refSpectrum, querySpectrum, refPeakListIdx=-1, queryPeakListIdx=-1)
#
# for peak in querySpectrum.peakLists[-1].peaks:
# p1,p2 = peak.position[0], peak.position[1]
# p1x = p1-(a[0][0])
# p2x = p2-(a[0][1])
# peak.position = (p1x,p2x)
#------------------------------------------------------------------------------------------------------
# Spectrum projection
# GWV: Adapted from DataSource.py
#------------------------------------------------------------------------------------------------------
PROJECTION_METHODS = ('max', 'max above threshold', 'min', 'min below threshold',
'sum', 'sum above threshold', 'sum below threshold')
def _getProjection(spectrum, axisCodes: tuple, method: str = 'max', threshold=None):
"""Get projected plane defined by axisCodes using method and optional threshold
return projected data array
NB Called by Spectrum.getProjection
"""
if method not in PROJECTION_METHODS:
raise ValueError('For spectrum projection, method must be one of %s' % (PROJECTION_METHODS,))
if method.endswith('threshold') and threshold is None:
raise ValueError('For spectrum projection method "%s", threshold parameter must be defined' % (method,))
projectedData = None
for position, planeData in spectrum.allPlanes(axisCodes, exactMatch=True):
if method == 'sum above threshold' or method == 'max above threshold':
lowIndices = planeData < threshold
planeData[lowIndices] = 0
elif method == 'sum below threshold' or method == 'min below threshold':
lowIndices = planeData > -threshold
planeData[lowIndices] = 0
if projectedData is None:
# first plane
projectedData = planeData
elif method == 'max' or method == 'max above threshold':
projectedData = np.maximum(projectedData, planeData)
elif method == 'min' or method == 'min below threshold':
projectedData = np.minimum(projectedData, planeData)
else:
projectedData += planeData
return projectedData
#------------------------------------------------------------------------------------------------------
# Baseline Correction for 1D spectra
# 14/2/2017
#
# Baseline Correction for 1D spectra.
# Multiple algorithms comparison:
#
# -Asl
# -Whittaker Smooth
# -AirPls
# -ArPls
# -Lowess
# -Polynomial Fit
#
# NB: Yet To be tested the newest algorithm found in literature based on machine learning:
# “Estimating complicated baselines in analytical signals using the iterative training of
# Bayesian regularized artificial neural networks. Abolfazl Valadkhani et al.
# Analytica Chimica Acta. September 2016 DOI: 10.1016/j.aca.2016.08.046
#
#------------------------------------------------------------------------------------------------------
from scipy.sparse import csc_matrix, eye, diags
from scipy import sparse
from scipy.sparse.linalg import spsolve
[docs]def als(y, lam=10 ** 2, p=0.001, nIter=10):
"""Implements an Asymmetric Least Squares (Asl) Smoothing
baseline correction algorithm
H C Eilers, Paul & F M Boelens, Hans. (2005). Baseline Correction with Asymmetric Least Squares Smoothing. Unpubl. Manuscr. .
y = signal
lam = smoothness, 10**2 ≤ λ ≤ 10**9.
p = asymmetry, 0.001 ≤ p ≤ 0.1 is a good choice for a signal with positive peaks.
niter = Number of iteration, default 10.
"""
nIter = max(1, nIter)
L = len(y)
D = sparse.csc_matrix(np.diff(np.eye(L), 2))
w = np.ones(L)
for i in range(nIter):
W = sparse.spdiags(w, 0, L, L)
Z = W + lam * D.dot(D.transpose())
z = spsolve(Z, w * y)
w = p * (y > z) + (1 - p) * (y < z)
return z
[docs]def WhittakerSmooth(y, w, lambda_, differences=1):
"""
Whittaker Smooth algorithm
no licence, source from web
Penalized least squares algorithm for background fitting
input
x: input data (i.e. chromatogram of spectrum)
w: binary masks (value of the mask is zero if a point belongs to peaks and one otherwise)
lambda_: parameter that can be adjusted by user. The larger lambda is, the smoother the resulting background
differences: integer indicating the order of the difference of penalties
output
the fitted background vector
"""
X = np.matrix(y)
m = X.size
i = np.arange(0, m)
E = eye(m, format='csc')
D = E[1:] - E[:-1] # numpy.diff() does not work with sparse matrix. This is a workaround.
W = diags(w, 0, shape=(m, m))
A = csc_matrix(W + (lambda_ * D.T * D))
B = csc_matrix(W * X.T)
background = spsolve(A, B)
return np.array(background)
[docs]def airPLS(y, lambda_=100, porder=1, itermax=15):
"""
airPLS algorithm
no licence, source from web
Adaptive iteratively reweighted penalized least squares for baseline fitting
input
x: input data (i.e. chromatogram of spectrum)
lambda_: parameter that can be adjusted by user. The larger lambda is, the smoother the resulting background, z
porder: adaptive iteratively reweighted penalized least squares for baseline fitting
output
the fitted background vector
"""
itermax = max(1, itermax)
m = y.shape[0]
w = np.ones(m)
for i in range(1, itermax + 1):
z = WhittakerSmooth(y, w, lambda_, porder)
d = y - z
dssn = np.abs(d[d < 0].sum())
if (dssn < 0.001 * (abs(y)).sum() or i == itermax):
if (i == itermax):
getLogger().warning('max iteration reached!')
break
w[d >= 0] = 0 # d>0 means that this point is part of a peak, so its weight is set to 0 in order to ignore it
w[d < 0] = np.exp(i * np.abs(d[d < 0]) / dssn)
w[0] = np.exp(i * (d[d < 0]).max() / dssn)
w[-1] = w[0]
return z
[docs]def polynomialFit(x, y, order: int = 3):
"""
polynomial Fit algorithm
:param x: x values
:param y: y values
:param order: polynomial order
:return: fitted baseline
"""
fit = np.polyval(np.polyfit(x, y, deg=order), x)
return fit
[docs]def arPLS(y, lambda_=5.e5, ratio=1.e-6, itermax=50):
"""
arPLS algorithm
Baseline correction using asymmetrically reweighted penalized least squares
smoothing.
http://pubs.rsc.org/en/Content/ArticleLanding/2015/AN/C4AN01061B#!divAbstract
:param y: The 1D spectrum
:param lambda_: (Optional) Adjusts the balance between fitness and smoothness.
A smaller lamda_ favors fitness.
Default is 1.e5.
:param ratio: (Optional) Iteration will stop when the weights stop changing.
(weights_(i) - weights_(i+1)) / (weights_(i)) < ratio.
Default is 1.e-6.
:returns: The smoothed baseline of y.
"""
itermax = max(1, itermax)
y = np.array(y)
N = y.shape[0]
E = eye(N, format='csc')
# numpy.diff() does not work with sparse matrix. This is a workaround.
# It creates the second order difference matrix.
# [1 -2 1 ......]
# [0 1 -2 1 ....]
# [.............]
# [.... 0 1 -2 1]
D = E[:-2] - 2 * E[1:-1] + E[2:]
H = lambda_ * D.T * D
Y = np.matrix(y)
w = np.ones(N)
for i in range(itermax + 10):
W = diags(w, 0, shape=(N, N))
Q = W + H
B = W * Y.T
z = spsolve(Q, B)
d = y - z
dn = d[d < 0.0]
m = np.mean(dn)
if np.isnan(m):
# add a tiny bit of noise to Y
y2 = y.copy()
if np.std(y) != 0.:
y2 += (np.random.random(y.size) - 0.5) * np.std(y) / 1000.
elif np.mean(y) != 0.0:
y2 += (np.random.random(y.size) - 0.5) * np.mean(y) / 1000.
else:
y2 += (np.random.random(y.size) - 0.5) / 1000.
y = y2
Y = np.matrix(y2)
W = diags(w, 0, shape=(N, N))
Q = W + H
B = W * Y.T
z = spsolve(Q, B)
d = y - z
dn = d[d < 0.0]
m = np.mean(dn)
s = np.std(dn, ddof=1)
wt = 1. / (1 + np.exp(2. * (d - (2 * s - m)) / s))
# check exit condition
condition = np.linalg.norm(w - wt) / np.linalg.norm(w)
if condition < ratio:
break
if i > itermax:
break
w = wt
return z
[docs]def arPLS_Implementation(y, lambdaValue=5.e4, maxValue=1e6, minValue=-1e6, itermax=10, interpolate=True):
"""
Implementation of the arPLS algorithm
:param maxValue = maxValue of the baseline noise
:param minValue = minValue of the baseline noise
:param interpolate: Where are the peaks: interpolate the points from neighbours otherwise set them to 0.
"""
lenghtY = len(y)
sparseMatrix = eye(lenghtY, format='csc')
differenceMatrix = sparseMatrix[:-2] - 2 * sparseMatrix[1:-1] + sparseMatrix[2:]
H = lambdaValue * differenceMatrix.T * differenceMatrix
Y = np.matrix(y)
w = np.ones(lenghtY)
for i in range(itermax):
W = diags(w, 1, shape=(lenghtY, lenghtY))
Q = W + H
B = W * Y.T
z = spsolve(Q, B)
mymask = (z > maxValue) | (z < minValue)
b = np.ma.masked_where(mymask, z)
if interpolate:
c = np.interp(np.where(mymask)[0], np.where(~mymask)[0], b[np.where(~mymask)[0]])
b[np.where(mymask)[0]] = c
else:
b = np.ma.filled(b, fill_value=0)
return b
# GWV disabled 5/8/2021
# def lowess(x, y):
# """
# LOWESS (Locally Weighted Scatterplot Smoothing).
# A lowess function that outs smoothed estimates of endog
# at the given exog values from points (exog, endog)
# To use this, you need to install statsmodels in your miniconda:
# - conda install statsmodels or pip install --upgrade --no-deps statsmodels
# """
#
# from scipy.interpolate import interp1d
# #FIXME: invalid import
# import statsmodels.api as sm
#
# # introduce some floats in our x-values
#
# # lowess will return our "smoothed" data with a y value for at every x-value
# lowess = sm.nonparametric.lowess(y, x, frac=.3)
#
# # unpack the lowess smoothed points to their values
# lowess_x = list(zip(*lowess))[0]
# lowess_y = list(zip(*lowess))[1]
#
# # run scipy's interpolation. There is also extrapolation I believe
# f = interp1d(lowess_x, lowess_y, bounds_error=False)
#
# # this this generate y values for our xvalues by our interpolator
# # it will MISS values outsite of the x window (less than 3, greater than 33)
# # There might be a better approach, but you can run a for loop
# # and if the value is out of the range, use f(min(lowess_x)) or f(max(lowess_x))
# ynew = f(x)
# return ynew
[docs]def nmrGlueBaselineCorrector(data, wd=20):
"""
:param data: 1D ndarray
One dimensional NMR data with real value (intensities)
wd : float Median window size in pts.
:return: same as data
"""
import nmrglue as ng
data = ng.process.proc_bl.baseline_corrector(data, wd=wd)
return data
def _getDefaultApiSpectrumColours(spectrum) -> Tuple[str, str]:
"""Get the default colours from the core spectrum class
"""
# from ccpn.util.Colour import spectrumHexColours
from ccpn.ui.gui.guiSettings import getColours, SPECTRUM_HEXCOLOURS, SPECTRUM_HEXDEFAULTCOLOURS
from ccpn.util.Colour import hexToRgb, findNearestHex, invertRGBHue, rgbToHex
dimensionCount = spectrum.dimensionCount
serial = spectrum._serial
expSerial = spectrum.experiment.serial
spectrumHexColours = getColours().get(SPECTRUM_HEXCOLOURS)
spectrumHexDefaultColours = getColours().get(SPECTRUM_HEXDEFAULTCOLOURS)
# use different colour lists for 1d and Nd
if dimensionCount < 2:
colorCount = len(spectrumHexColours)
step = ((colorCount // 2 - 1) // 2)
kk = colorCount // 7
index = expSerial - 1 + step * (serial - 1)
posCol = spectrumHexColours[(kk * index + 10) % colorCount]
negCol = spectrumHexColours[((kk + 1) * index + 10) % colorCount]
else:
try:
# try and get the colourPalette number from the preferences, otherwise use 0
from ccpn.framework.Application import getApplication
colourPalette = getApplication().preferences.general.colourPalette
except:
colourPalette = 0
if colourPalette == 0:
# colours for Vicky :)
colorCount = len(spectrumHexDefaultColours)
step = ((colorCount // 2 - 1) // 2)
index = expSerial - 1 + step * (serial - 1)
posCol = spectrumHexDefaultColours[(2 * index) % colorCount]
negCol = spectrumHexDefaultColours[(2 * index + 1) % colorCount]
else:
# automatic colours
colorCount = len(spectrumHexColours)
step = ((colorCount // 2 - 1) // 2)
kk = 11 #colorCount // 11
index = expSerial - 1 + step * (serial - 1)
posCol = spectrumHexColours[(kk * index + 10) % colorCount]
# invert the colour by reversing the ycbcr palette
rgbIn = hexToRgb(posCol)
negRGB = invertRGBHue(*rgbIn)
oppCol = rgbToHex(*negRGB)
# get the nearest one in the current colour list, so colourName exists
negCol = findNearestHex(oppCol, spectrumHexColours)
return (posCol, negCol)
[docs]def getDefaultSpectrumColours(self: 'Spectrum') -> Tuple[str, str]:
"""Get default positivecontourcolour, negativecontourcolour for Spectrum
(calculated by hashing spectrum properties to avoid always getting the same colours
Currently matches getDefaultColours in dataSource that is set through the api
"""
return _getDefaultApiSpectrumColours(self)
[docs]def get1DdataInRange(x, y, xRange):
"""
:param x:
:param y:
:param xRange:
:return: x,y within the xRange (minXrange,maxXrange)
"""
if xRange is None:
return x, y
point1, point2 = np.max(xRange), np.min(xRange)
x_filtered = np.where((x <= point1) & (x >= point2))
y_filtered = y[x_filtered]
return x_filtered, y_filtered
def _recurseData(ii, dataList, startCondition, endCondition):
"""Iterate over the dataArray, subdividing each iteration
"""
for data in dataList:
if not data.size:
continue
# calculate the noise values
flatData = data.flatten()
SD = np.std(flatData)
max = np.max(flatData)
min = np.min(flatData)
mn = np.mean(flatData)
noiseLevel = mn + 3.5 * SD
if not startCondition:
startCondition[:] = [ii, data.shape, SD, max, min, mn, noiseLevel]
endCondition[:] = startCondition[:]
if SD < endCondition[2]:
endCondition[:] = [ii, data.shape, SD, max, min, mn, noiseLevel]
# stop iterating when all dimensions are <= 64 elements
if any(dim > 64 for dim in data.shape):
newData = [data]
for jj in range(len(data.shape)):
newData = [np.array_split(dd, 2, axis=jj) for dd in newData]
newData = [val for sublist in newData for val in sublist]
_recurseData(ii + 1, newData, startCondition, endCondition)
# keep for a minute - example of how to call _recurseData
# # iterate over the array to calculate noise at each level
# dataList = [dataArray]
# startCondition = []
# endCondition = []
# _recurseData(0, dataList, startCondition, endCondition)
#------------------------------------------------------------------------------------------------------
DEFAULTMULTIPLIER = 1.414214
DEFAULTLEVELS = 10
DEFAULTCONTOURBASE = 10000.0
[docs]def setContourLevelsFromNoise(spectrum, setNoiseLevel=True,
setPositiveContours=True, setNegativeContours=True,
useDefaultMultiplier=True, useDefaultLevels=True, useDefaultContourBase=False,
useSameMultiplier=True,
defaultMultiplier=DEFAULTMULTIPLIER, defaultLevels=DEFAULTLEVELS, defaultContourBase=DEFAULTCONTOURBASE):
"""Calculate the noise level, base contour level and positive/negative multipliers for the given spectrum
"""
# parameter error checking
if not isinstance(setNoiseLevel, bool):
raise TypeError('setNoiseLevel is not boolean.')
if not isinstance(setPositiveContours, bool):
raise TypeError('setPositiveContours is not boolean.')
if not isinstance(setNegativeContours, bool):
raise TypeError('setNegativeContours is not boolean.')
if not isinstance(useDefaultMultiplier, bool):
raise TypeError('useDefaultMultiplier is not boolean.')
if not isinstance(useDefaultLevels, bool):
raise TypeError('useDefaultLevels is not boolean.')
if not isinstance(useDefaultContourBase, bool):
raise TypeError('useDefaultContourBase is not boolean.')
if not isinstance(useSameMultiplier, bool):
raise TypeError('useSameMultiplier is not boolean.')
if not (isinstance(defaultMultiplier, float) and defaultMultiplier > 0):
raise TypeError('defaultMultiplier is not a positive float.')
if not (isinstance(defaultLevels, int) and defaultLevels > 0):
raise TypeError('defaultLevels is not a positive int.')
if not (isinstance(defaultContourBase, float) and defaultContourBase > 0):
raise TypeError('defaultContourBase is not a positive float.')
if spectrum.dimensionCount == 1:
return
# exit if nothing set
if not (setNoiseLevel or setPositiveContours or setNegativeContours):
return
if any(x != 'Frequency' for x in spectrum.dimensionTypes):
raise NotImplementedError("setContourLevelsFromNoise not implemented for processed frequency spectra, dimension types were: {}".format(spectrum.dimensionTypes, ))
getLogger().info("estimating noise level for spectrum %s" % str(spectrum.pid))
if setNoiseLevel:
# get noise level using random sampling method - may be slow for large spectra
noise = getNoiseEstimate(spectrum)
base = spectrum.noiseLevel = noise.noiseLevel
else:
base = spectrum.noiseLevel
# need to generate a min/max
noise = getContourEstimate(spectrum)
# noise => noise.mean, noise.std, noise.min, noise.max, noise.noiseLevel
if useDefaultLevels:
posLevels = defaultLevels
negLevels = defaultLevels
else:
# get from the spectrum
posLevels = spectrum.positiveContourCount
negLevels = spectrum.negativeContourCount
if useDefaultMultiplier:
# use default as root2
posMult = negMult = defaultMultiplier
else:
# calculate multiplier to give contours across range of spectrum; trap base = 0
mx = noise.max
mn = noise.min
posMult = pow(abs(mx / base), 1 / posLevels) if base else 0.0
if useSameMultiplier:
negMult = posMult
else:
negMult = pow(abs(mn / base), 1 / negLevels) if base else 0.0
if setPositiveContours:
try:
spectrum.positiveContourBase = base
spectrum.positiveContourFactor = posMult
except Exception as es:
# set to defaults if an error occurs
spectrum.positiveContourBase = defaultContourBase
spectrum.positiveContourFactor = defaultMultiplier
getLogger().warning('Error setting contour levels - %s', str(es))
spectrum.positiveContourCount = posLevels
if setNegativeContours:
try:
spectrum.negativeContourBase = -base
spectrum.negativeContourFactor = negMult
except Exception as es:
# set to defaults if an error occurs
spectrum.negativeContourBase = -defaultContourBase
spectrum.negativeContourFactor = defaultMultiplier
getLogger().warning('Error setting contour levels - %s', str(es))
spectrum.negativeContourCount = negLevels
return
[docs]def getContourLevelsFromNoise(spectrum,
setPositiveContours=False, setNegativeContours=False,
useDefaultMultiplier=True, useDefaultLevels=True, useDefaultContourBase=False,
useSameMultiplier=True,
defaultMultiplier=DEFAULTMULTIPLIER, defaultLevels=DEFAULTLEVELS, defaultContourBase=DEFAULTCONTOURBASE):
"""Calculate the noise level, base contour level and positive/negative multipliers for the given spectrum
"""
# parameter error checking
if not isinstance(setPositiveContours, bool):
raise TypeError('setPositiveContours is not boolean.')
if not isinstance(setNegativeContours, bool):
raise TypeError('setNegativeContours is not boolean.')
if not isinstance(useDefaultMultiplier, bool):
raise TypeError('useDefaultMultiplier is not boolean.')
if not isinstance(useDefaultLevels, bool):
raise TypeError('useDefaultLevels is not boolean.')
if not isinstance(useDefaultContourBase, bool):
raise TypeError('useDefaultContourBase is not boolean.')
if not isinstance(useSameMultiplier, bool):
raise TypeError('useSameMultiplier is not boolean.')
if not (isinstance(defaultMultiplier, float) and defaultMultiplier > 0):
raise TypeError('defaultMultiplier is not a positive float.')
if not (isinstance(defaultLevels, int) and defaultLevels > 0):
raise TypeError('defaultLevels is not a positive int.')
if not (isinstance(defaultContourBase, float) and defaultContourBase > 0):
raise TypeError('defaultContourBase is not a positive float.')
if spectrum.dimensionCount == 1:
return [None] * 6
if any(x != 'Frequency' for x in spectrum.dimensionTypes):
raise NotImplementedError("getContourLevelsFromNoise not implemented for processed frequency spectra, dimension types were: {}".format(spectrum.dimensionTypes, ))
# need to generate a min/max
noise = getContourEstimate(spectrum)
# noise => noise.mean, noise.std, noise.min, noise.max, noise.noiseLevel
if useDefaultLevels:
posLevels = defaultLevels
negLevels = defaultLevels
else:
# get from the spectrum
posLevels = spectrum.positiveContourCount
negLevels = spectrum.negativeContourCount
if useDefaultContourBase:
posBase = defaultContourBase
else:
# calculate the base levels
posBase = spectrum.noiseLevel
negBase = -posBase if posBase else 0.0
if useDefaultMultiplier:
# use default as root2
posMult = negMult = defaultMultiplier
else:
# calculate multiplier to give contours across range of spectrum; trap base = 0
mx = noise.max
mn = noise.min
posMult = pow(abs(mx / posBase), 1 / posLevels) if posBase else 0.0
if useSameMultiplier:
negMult = posMult
else:
negMult = pow(abs(mn / negBase), 1 / negLevels) if negBase else 0.0
if not setPositiveContours:
posBase = posMult = posLevels = None
if not setNegativeContours:
negBase = negMult = negLevels = None
return posBase, negBase, posMult, negMult, posLevels, negLevels
[docs]def getClippedRegion(spectrum, strip, sort=False):
"""
Return the clipped region, bounded by the (ppmPoint(1), ppmPoint(n)) in visible order
If sorting is True, returns a tuple(tuple(minPpm, maxPpm), ...) for each region
else returns tuple(tuple(ppmLeft, ppmRight), ...)
:param spectrum:
:param strip:
:return:
"""
# calculate the visible region
selectedRegion = [strip.getAxisRegion(0), strip.getAxisRegion(1)]
for n in strip.orderedAxes[2:]:
selectedRegion.append((n.region[0], n.region[1]))
# use the ppmArrays to get the first/last point of the data
if spectrum.dimensionCount == 1:
ppmArrays = [spectrum.getPpmArray(dimension=1)]
else:
ppmArrays = [spectrum.getPpmArray(dimension=dim) for dim in spectrum.getByAxisCodes('dimensions', strip.axisCodes)]
# clip to the ppmArrays, not taking aliased regions into account
if sort:
return tuple(tuple(sorted(np.clip(region, np.min(limits), np.max(limits)))) for region, limits in zip(selectedRegion, ppmArrays))
else:
return tuple(tuple(np.clip(region, np.min(limits), np.max(limits))) for region, limits in zip(selectedRegion, ppmArrays))
[docs]def getNoiseEstimateFromRegion(spectrum, strip):
"""
Get the noise estimate from the visible region of the strip
:param spectrum:
:param strip:
:return:
"""
# calculate the region over which to estimate the noise
sortedSelectedRegion = getClippedRegion(spectrum, strip, sort=True)
if spectrum.dimensionCount == 1:
indices = [1]
else:
indices = spectrum.getByAxisCodes('dimensions', strip.axisCodes)
regionDict = {}
for idx, ac, region in zip(indices, strip.axisCodes, sortedSelectedRegion):
regionDict[ac] = tuple(region)
# get the data
dataArray = spectrum.getRegion(**regionDict)
# calculate the noise values
flatData = dataArray.flatten()
std = np.std(flatData)
max = np.max(flatData)
min = np.min(flatData)
mean = np.mean(flatData)
value = NoiseEstimateTuple(mean=mean,
std=std * 1.1 if std != 0 else 1.0,
min=min, max=max,
noiseLevel=None)
# noise function is defined here, but needs cleaning up
return _noiseFunc(value)
[docs]def getSpectrumNoise(spectrum):
"""
Get the noise level for a spectrum. If the noise level is not already set it will
be set at an estimated value.
.. describe:: Input
spectrum
.. describe:: Output
Float
"""
noise = spectrum.noiseLevel
if noise is None:
noise = getNoiseEstimate(spectrum)
spectrum.noiseLevel = noise.noiseLevel
return noise
[docs]def getNoiseEstimate(spectrum):
"""Get an estimate of the noiseLevel from the spectrum
noiseLevel is calculated as abs(mean) + 3.5 * SD
Calculated from a random subset of points
"""
# NOTE:ED more detail needed
fractPerAxis = 0.04
subsetFract = 0.25
fract = 0.1
maxSamples = 10000
npts = spectrum.pointCounts
# take % of points in each axis
_fract = (fractPerAxis ** len(npts))
nsamples = min(int(np.prod(npts) * _fract), maxSamples)
nsubsets = max(1, int(nsamples * subsetFract))
with notificationEchoBlocking():
return _getNoiseEstimate(spectrum, nsamples, nsubsets, fract)
[docs]def getContourEstimate(spectrum):
"""Get an estimate of the contour settings from the spectrum
Calculated from a random subset of points
"""
fractPerAxis = 0.04
subsetFract = 0.01
fract = 0.1
maxSamples = 10000
npts = spectrum.pointCounts
# take % of points in each axis
_fract = (fractPerAxis ** len(npts))
nsamples = min(int(np.prod(npts) * _fract), maxSamples)
nsubsets = max(1, int(nsamples * subsetFract))
with notificationEchoBlocking():
return _getContourEstimate(spectrum, nsamples, nsubsets, fract)
def _noiseFunc(value):
# take the 'value' NoiseEstimateTuple and add the noiseLevel
return NoiseEstimateTuple(mean=value.mean,
std=value.std,
min=value.min, max=value.max,
noiseLevel=abs(value.mean) + 3.5 * value.std)
def _getNoiseEstimate(spectrum, nsamples=1000, nsubsets=10, fraction=0.1):
"""
Estimate the noise level for a spectrum.
'nsamples' random samples are taken from the spectrum
'nsubsets' is the number of random permutations of data taken from the
and finding subsets with the lowest standard deviation
A tuple (mean, SD, min, max noiseLevel) is returned from the subset with the lowest standard deviation.
mean is the mean of the minimum random subset, SD is the standard deviation, min/max are the minimum/maximum values,
and noiseLevel is the estimated noiseLevel caluated as abs(mean) + 3.5 * SD
:param spectrum: input spectrum
:param nsamples: number reandom samples
:param nsubsets: number of subsets
:param fraction: subset fraction
:return: tuple(mean, SD, min, max, noiseLevel)
"""
npts = spectrum.pointCounts
if not isinstance(nsamples, int):
raise TypeError('nsamples must be an int')
if not (0 < nsamples <= np.prod(npts)):
raise ValueError(f'nsamples must be in range [1, {np.prod(npts)}]')
if not isinstance(nsubsets, int):
raise TypeError('nsubsets must be an int')
if not (0 < nsubsets <= nsamples):
# not strictly necessary but stops huge values
raise ValueError(f'nsubsets must be in range [1, {nsamples}]')
if not isinstance(fraction, float):
raise TypeError('fraction must be a float')
if not (0 < fraction <= 1.0):
raise ValueError('fraction must be i the range (0, 1]')
# create a list of random points in the spectrum, get only points that are not nan/inf
# getPointValue is the slow bit
allPts = [[min(n - 1, int(n * random.random()) + 1) for n in npts] for i in range(nsamples)]
_list = np.array([spectrum.getPointValue(pt) for pt in allPts], dtype=np.float32)
data = _list[np.isfinite(_list)]
fails = nsamples - len(data)
if fails:
getLogger().warning(f'Attempt to access {fails} non-existent data points in spectrum {spectrum}')
# check whether there are too many bad numbers in the data
good = nsamples - fails
if good == 0:
getLogger().warning(f'Spectrum {spectrum} contains all bad points')
return NoiseEstimateTuple(mean=None, std=None, min=None, max=None, noiseLevel=1.0)
elif good < 10: # arbitrary number of bad points
getLogger().warning(f'Spectrum {spectrum} contains minimal data')
maxValue = max([abs(x) for x in data])
if maxValue > 0:
return NoiseEstimateTuple(mean=None, std=None, min=None, max=None, noiseLevel=0.1 * maxValue)
else:
return NoiseEstimateTuple(mean=None, std=None, min=None, max=None, noiseLevel=1.0)
m = max(1, int(nsamples * fraction))
def meanStd():
# take m random values from data, and return mean/SD
y = np.random.choice(data, m)
return NoiseEstimateTuple(mean=np.mean(y), std=np.std(y), min=np.min(y), max=np.max(y), noiseLevel=None)
# generate 'nsubsets' noiseEstimates and take the one with the minimum standard deviation
value = min((meanStd() for i in range(nsubsets)), key=lambda mSD: mSD.std)
value = NoiseEstimateTuple(mean=value.mean,
std=value.std * 1.1 if value.std != 0 else 1.0,
min=value.min, max=value.max,
noiseLevel=None)
value = _noiseFunc(value)
return value
def _getDefaultOrdering(spectrum):
# axisOption = spectrum.project.application.preferences.general.axisOrderingOptions
preferredAxisOrder = spectrum._preferredAxisOrdering
if preferredAxisOrder is not None:
specAxisOrder = spectrum.axisCodes
axisOrder = [specAxisOrder[ii] for ii in preferredAxisOrder]
else:
# sets the Nd default to HCN (or possibly 2d to HC)
specAxisOrder = spectrum.axisCodes
pOrder = spectrum.searchAxisCodePermutations(('H', 'C', 'N'))
if pOrder:
spectrum._preferredAxisOrdering = pOrder
axisOrder = [specAxisOrder[ii] for ii in pOrder]
getLogger().debug('setting default axisOrdering: ', str(axisOrder))
else:
# just set to the normal ordering
spectrum._preferredAxisOrdering = tuple(ii for ii in range(spectrum.dimensionCount))
axisOrder = specAxisOrder
# try permutations of repeated codes
duplicates = [('H', 'H'), ('C', 'C'), ('N', 'N')]
for dCode in duplicates:
pOrder = spectrum.searchAxisCodePermutations(dCode)
# if permutation found and matches first axis
if pOrder and pOrder[0] == 0:
spectrum._preferredAxisOrdering = pOrder
axisOrder = [specAxisOrder[ii] for ii in pOrder]
getLogger().debug('setting duplicate axisOrdering: ', str(axisOrder))
break
return axisOrder
def _getContourEstimate(spectrum, nsamples=1000, nsubsets=10, fraction=0.1):
"""
Estimate the contour levels for a spectrum.
'nsamples' random samples are taken from the spectrum
'nsubsets' is the number of random permutations of data taken from the
and finding subsets with the lowest standard deviation
A tuple (mean, SD, min, max noiseLevel) is returned from the subset with the lowest standard deviation.
mean is the mean of the minimum random subset, SD is the standard deviation, min/max are the minimum/maximum values,
and noiseLevel is the estimated noiseLevel caluated as abs(mean) + 3.5 * SD
:param spectrum: input spectrum
:param nsamples: number reandom samples
:param nsubsets: number of subsets
:param fraction: subset fraction
:return: tuple(mean, SD, min, max, noiseLevel)
"""
npts = spectrum.pointCounts
if not isinstance(nsamples, int):
raise TypeError('nsamples must be an int')
if not (0 < nsamples <= np.prod(npts)):
raise ValueError(f'nsamples must be in range [1, {np.prod(npts)}]')
if not isinstance(nsubsets, int):
raise TypeError('nsubsets must be an int')
if not (0 < nsubsets <= nsamples):
# not strictly necessary but stops huge values
raise ValueError(f'nsubsets must be in range [1, {nsamples}]')
if not isinstance(fraction, float):
raise TypeError('fraction must be a float')
if not (0 < fraction <= 1.0):
raise ValueError('fraction must be i the range (0, 1]')
# create a list of random points in the spectrum, get only points that are not nan/inf
# getPointValue is the slow bit
allPts = [[min(n - 2, int(n * random.random())) for n in npts] for i in range(nsamples)]
_list = np.array([spectrum.getPointValue(pt) for pt in allPts], dtype=np.float32)
data = _list[np.isfinite(_list)]
fails = nsamples - len(data)
if fails:
getLogger().warning(f'Attempt to access {fails} non-existent data points in spectrum {spectrum}')
# check whether there are too many bad numbers in the data
good = nsamples - fails
if good == 0:
getLogger().warning(f'Spectrum {spectrum} contains all bad points')
return NoiseEstimateTuple(mean=None, std=None, min=None, max=None, noiseLevel=1.0)
elif good < 10: # arbitrary number of bad points
getLogger().warning(f'Spectrum {spectrum} contains minimal data')
maxValue = max([abs(x) for x in data])
if maxValue > 0:
return NoiseEstimateTuple(mean=None, std=None, min=None, max=None, noiseLevel=0.1 * maxValue)
else:
return NoiseEstimateTuple(mean=None, std=None, min=None, max=None, noiseLevel=1.0)
value = NoiseEstimateTuple(mean=None,
std=None,
min=np.min(data), max=np.max(data),
noiseLevel=None)
return value
def _signalToNoiseFunc(noise, signal):
snr = math.log10(abs(np.mean(signal) ** 2 / np.mean(noise) ** 2))
return snr
[docs]def estimateSignalRegion(y, nlMax=None, nlMin=None):
if y is None: return 0
if nlMax is None or nlMin is None:
nlMax, nlMin = estimateNoiseLevel1D(y)
eS = np.where(y >= nlMax)
eSN = np.where(y <= nlMin)
eN = np.where((y < nlMax) & (y > nlMin))
estimatedSignalRegionPos = y[eS]
estimatedSignalRegionNeg = y[eSN]
estimatedSignalRegion = np.concatenate((estimatedSignalRegionPos, estimatedSignalRegionNeg))
estimatedNoiseRegion = y[eN]
lenghtESR = len(estimatedSignalRegion)
lenghtENR = len(estimatedNoiseRegion)
if lenghtESR > lenghtENR:
l = lenghtENR
else:
l = lenghtESR
if l == 0:
return np.array([])
else:
noise = estimatedNoiseRegion[:l - 1]
signalAndNoise = estimatedSignalRegion[:l - 1]
signal = abs(signalAndNoise - noise)
signal[::-1].sort() # descending
noise[::1].sort()
if hasattr(signal, 'compressed') and hasattr(noise, 'compressed'):
signal = signal.compressed() # remove the mask
noise = noise.compressed() # remove the mask
s = signal[:int(l / 2)]
n = noise[:int(l / 2)]
if len(signal) == 0:
return np.array([])
else:
return s
[docs]def estimateSNR(noiseLevels, signalPoints, factor=2.5):
"""
SNratio = factor*(height/|NoiseMax-NoiseMin|)
:param noiseLevels: (max, min) floats
:param signalPoints: iterable of floats estimated to be signal or peak heights
:param factor: default 2.5
:return: array of snr for each point compared to the delta noise level
"""
maxNL = np.max(noiseLevels)
minNL = np.min(noiseLevels)
dd = abs(maxNL - minNL)
pp = np.array([s for s in signalPoints])
if dd != 0 and dd is not None:
snRatios = (factor * pp) / dd
return abs(snRatios)
return [None] * len(signalPoints)
[docs]def estimateNoiseLevel1D(y, f=10, stdFactor=0.5) -> Tuple[float, float]:
"""
:param y: the y region of the spectrum.
:param f: percentage of the spectrum to use. If given a portion known to be just noise, set it to 100.
:param stdFactor: 0 to don't adjust the initial guess.
:return: tuple (float, float) of estimated noise threshold as max and min
"""
eMax, eMin = 0, 0
if stdFactor == 0:
stdFactor = 0.01
getLogger().warning('stdFactor of value zero is not allowed.')
if y is None:
return eMax, eMin
percent = percentage(f, int(len(y)))
fy = y[:int(percent)]
stdValue = np.std(fy) * stdFactor
eMax = np.max(fy) + stdValue
eMin = np.min(fy) - stdValue
return float(eMax), float(eMin)
def _filterROI1Darray(x, y, roi):
""" Return region included in the ROI ppm position"""
mask = _getMaskedRegionInsideLimits(x, roi)
return x[mask], y[mask]
def _getMaskedRegionInsideLimits(x, limits):
"""
Return an array of Booleans for the condition.
True if the point in the array is within the limits, False otherwise.
Limits and Array can be positives and/or negatives
"""
import numpy.ma as ma
mask = ma.masked_inside(x, *limits)
return mask.mask
def _filtered1DArray(data, ignoredRegions):
# returns an array without ignoredRegions. Used for automatic 1d peak picking
ppmValues = data[0]
masks = []
ignoredRegions = [sorted(pair, reverse=True) for pair in ignoredRegions]
for region in ignoredRegions:
mask = (ppmValues > region[0]) | (ppmValues < region[1])
masks.append(mask)
fullmask = [all(mask) for mask in zip(*masks)]
newArray = (np.ma.MaskedArray(data, mask=np.logical_not((fullmask, fullmask))))
return newArray
def _initExpBoundResonances(experiment):
"""
# CCPNInternal - API Level routine -
Refresh the covalently bound status for any resonances connected
via peaks in a given experiment.
"""
resonances = {}
for spectrum in experiment.dataSources:
for peakList in spectrum.peakLists:
for peak in peakList.peaks:
for peakDim in peak.peakDims:
for contrib in peakDim.peakDimContribs:
resonances[contrib.resonance] = None
for resonance in resonances.keys():
_getBoundResonances(resonance, recalculate=True)
def _setApiExpTransfers(experiment, overwrite=True):
"""
# CCPNInternal - API Level routine -
Set up the ExpTransfers for an experiment using available refExperiment
information. Boolean option to remove any existing transfers.
List of Nmr.ExpTransfers
"""
if not experiment.refExperiment:
for expTransfer in experiment.expTransfers:
expTransfer.delete()
_initExpBoundResonances(experiment)
return
if experiment.expTransfers:
if not overwrite:
return list(experiment.expTransfers)
else:
for expTransfer in experiment.expTransfers:
expTransfer.delete()
visibleSites = {}
for expDim in experiment.expDims:
for expDimRef in expDim.expDimRefs:
if not expDimRef.refExpDimRef:
continue
measurement = expDimRef.refExpDimRef.expMeasurement
if measurement.measurementType in ('Shift', 'shift', 'MQShift'):
for atomSite in measurement.atomSites:
if atomSite not in visibleSites:
visibleSites[atomSite] = []
visibleSites[atomSite].append(expDimRef)
transferDict = {}
for atomSite in visibleSites:
expDimRefs = visibleSites[atomSite]
for expTransfer in atomSite.expTransfers:
atomSiteA, atomSiteB = expTransfer.atomSites
if (atomSiteA in visibleSites) and (atomSiteB in visibleSites):
if transferDict.get(expTransfer) is None:
transferDict[expTransfer] = []
transferDict[expTransfer].extend(expDimRefs)
# Indirect transfers, e.g. Ch_hC.NOESY or H_hC.NOESY
indirectTransfers = set()
for expGraph in experiment.refExperiment.nmrExpPrototype.expGraphs:
for expTransfer in expGraph.expTransfers:
if (expTransfer not in transferDict) and \
(expTransfer.transferType in longRangeTransfers):
atomSiteA, atomSiteB = expTransfer.atomSites
if atomSiteA not in visibleSites:
for expTransferA in atomSiteA.expTransfers:
if expTransferA.transferType != 'onebond':
continue
atomSites = list(expTransferA.atomSites)
atomSites.remove(atomSiteA)
atomSiteC = atomSites[0]
if atomSiteC in visibleSites:
atomSiteA = atomSiteC
break
if atomSiteB not in visibleSites:
for expTransferB in atomSiteB.expTransfers:
if expTransferB.transferType != 'onebond':
continue
atomSites = list(expTransferB.atomSites)
atomSites.remove(atomSiteB)
atomSiteD = atomSites[0]
if atomSiteD in visibleSites:
atomSiteB = atomSiteD
break
if (atomSiteA in visibleSites) and (atomSiteB in visibleSites):
expDimRefsA = visibleSites[atomSiteA]
expDimRefsB = visibleSites[atomSiteB]
transferDict[expTransfer] = expDimRefsA + expDimRefsB
indirectTransfers.add(expTransfer)
expTransfers = []
for refTransfer in transferDict.keys():
expDimRefs = frozenset(transferDict[refTransfer])
if len(expDimRefs) == 2:
transferType = refTransfer.transferType
expTransfer = experiment.findFirstExpTransfer(expDimRefs=expDimRefs)
if expTransfer:
# normally this would not need setting
# but we renamed NOESY to through-space so this catches that situation
expTransfer.transferType = transferType
else:
expTransfer = experiment.newExpTransfer(transferType=transferType,
expDimRefs=expDimRefs)
if refTransfer in indirectTransfers:
isDirect = False
else:
isDirect = True
expTransfer.isDirect = isDirect
expTransfers.append(expTransfer)
_initExpBoundResonances(experiment)
return expTransfers
def _getAcqRefExpDimRef(refExperiment):
"""
# CCPNInternal - API Level routine -
RefExpDimRef that corresponds to acquisition dimension
"""
# get acquisition measurement
expGraph = refExperiment.nmrExpPrototype.findFirstExpGraph()
# even if there are several the acquisition dimension should be common.
ll = [(expStep.stepNumber, expStep) for expStep in expGraph.expSteps]
expSteps = [x[1] for x in sorted(ll)]
if refExperiment.isReversed:
acqMeasurement = expSteps[0].expMeasurement
else:
acqMeasurement = expSteps[-1].expMeasurement
# get RefExpDimRef that fits measurement
ll = []
for refExpDim in refExperiment.sortedRefExpDims():
for refExpDimRef in refExpDim.sortedRefExpDimRefs():
if refExpDimRef.expMeasurement is acqMeasurement:
ll.append(refExpDimRef)
if len(ll) == 1:
return ll[0]
else:
raise RuntimeError("%s has no unambiguous RefExpDimRef for acqMeasurement (%s)"
% (refExperiment, acqMeasurement))
def _getAcqExpDim(experiment, ignorePreset=False):
"""
# CCPNInternal - API Level routine -
ExpDim that corresponds to acquisition dimension. NB uses heuristics
"""
ll = experiment.findAllExpDims(isAcquisition=True)
if len(ll) == 1 and not ignorePreset:
# acquisition dimension set - return it
result = ll.pop()
else:
# no reliable acquisition dimension set
result = None
dataSources = experiment.sortedDataSources()
if dataSources:
dataSource = dataSources[0]
for ds in dataSources[1:]:
# more than one data source. Pick one of the largest.
if ds.numDim > dataSource.numDim:
dataSource = ds
# Take dimension with most points
useDim = None
currentVal = -1
for dd in dataSource.sortedDataDims():
if hasattr(dd, 'numPointsOrig'):
val = dd.numPointsOrig
else:
val = dd.numPoints
if val > currentVal:
currentVal = val
useDim = dd
if useDim is not None:
result = useDim.expDim
if result is None:
# no joy so far - just take first ExpDim
ll = experiment.sortedExpDims()
if ll:
result = ll[0]
return result
def _getRefExpDim4ExpDim(expDim):
"""
get the link between refExpDim and expDim through their children refExpDimRef and expDimRef.
"""
refExpDim = None
for expDimRef in expDim.expDimRefs:
if expDimRef.refExpDimRef:
refExpDim = expDimRef.refExpDimRef.refExpDim
break
return refExpDim
def _tempLinkRefExpDim2ExpDim(expDim, refExpDim):
"""
Temporary link expDim and refExpDim through their sorted children refExpDimRefs and expDimRefs.
The match refExpDimRef-expDimRef will be redone by isotope code.
"""
if len(expDim.sortedExpDimRefs()) == len(refExpDim.sortedRefExpDimRefs()):
for expDimRef, refExpDimRef in zip(expDim.sortedExpDimRefs(), refExpDim.sortedRefExpDimRefs()):
expDimRef.setRefExpDimRef(refExpDimRef)
def _clearLinkToRefExp(experiment):
for expDim in experiment.expDims:
refExpDim = _getRefExpDim4ExpDim(expDim)
if refExpDim:
for expDimRef in expDim.expDimRefs:
if expDimRef is not None:
if expDimRef.refExpDimRef:
expDimRef.setRefExpDimRef(None)
def _setApiRefExperiment(experiment, refExperiment):
"""
# CCPNInternal - API Level routine -
Sets the reference experiment for an existing experiment
and tries to map the ExpDims to RefExpDims appropriately.
This routine is very convoluted. Should be simplified/reimplemented.
We are only trying to make this link: expDimRef.refExpDimRef = refExpDimRef
experiment------ expDim(s)-------expDimRef(s)
*
refExperiment---refExpDim(s)----refExpDimRef(s)
"""
_clearLinkToRefExp(experiment)
experiment.setRefExperiment(refExperiment)
if refExperiment is None:
return
refExpDims = refExperiment.sortedRefExpDims()
if not refExpDims:
# Something is wrong with the reference data
return
expDims = experiment.sortedExpDims()
if not expDims:
# Something is wrong with the experiment
return
acqRefExpDim = _getAcqRefExpDimRef(refExperiment).refExpDim
acqExpDim = _getAcqExpDim(experiment, ignorePreset=True)
if ((refExpDims.index(acqRefExpDim) * 2 < len(refExpDims)) !=
(expDims.index(acqExpDim) * 2 < len(expDims))):
# acqRefExpDim and acqExpDim are at opposite ends of their
# respective lists. reverse refExpDims so that acquisition
# dimensions will more likely get mapped to each other.
refExpDims.reverse()
# Rasmus 12/7/12. Must be set to None, as otherwise it is never reset below.
# We do not want the heuristic _getAcqExpDim to override everything else.
acqExpDim = None
for expDim in expDims:
expData = []
for expDimRef in expDim.expDimRefs:
isotopes = frozenset(expDimRef.isotopeCodes)
if isotopes:
mType = expDimRef.measurementType.lower()
expData.append((mType, isotopes))
if not expData:
continue
for refExpDim in refExpDims:
refData = []
for refExpDimRef in refExpDim.refExpDimRefs:
expMeasurement = refExpDimRef.expMeasurement
isotopes = frozenset([x.isotopeCode for x in expMeasurement.atomSites])
mType = expMeasurement.measurementType.lower()
refData.append((mType, isotopes))
if expData == refData:
# expDim.refExpDim = refExpDim # this link is not anymore in the v3 API. set refExpDimRefs and expDimRefs directly
_tempLinkRefExpDim2ExpDim(expDim, refExpDim)
refExpDims.remove(refExpDim)
if refExpDim is acqRefExpDim:
if not acqExpDim:
expDim.isAcquisition = True
acqExpDim = expDim
break
for expDim in expDims:
if not expDim.expDimRefs:
continue
if not _getRefExpDim4ExpDim(expDim):
if len(refExpDims) > 0:
refExpDim = refExpDims.pop(0)
_tempLinkRefExpDim2ExpDim(expDim, refExpDim)
# set reference data comparison list
_refExpDim = _getRefExpDim4ExpDim(expDim)
if _refExpDim:
refExpDimRefs = list(_refExpDim.refExpDimRefs)
refData = []
for refExpDimRef in refExpDimRefs:
expMeasurement = refExpDimRef.expMeasurement
atomSites = expMeasurement.atomSites
refData.append((frozenset(x.isotopeCode for x in atomSites),
expMeasurement.measurementType.lower(),
frozenset(x.name for x in atomSites),
refExpDimRef))
# set experiment data comparison list
inData = []
for expDimRef in expDim.expDimRefs:
inData.append((frozenset(expDimRef.isotopeCodes),
expDimRef.measurementType.lower(),
frozenset(((expDimRef.displayName),)),
expDimRef))
# match expDimRef to refExpDimRef. comparing isotopeCodes,
# if equal measurementTypes, if equal name/displayname
for end in (-1, -2, -3):
for ii in range(len(inData) - 1, -1, -1):
for jj in range(len(refData) - 1, -1, -1):
if inData[ii][:end] == refData[jj][:end]:
expDimRef = inData[ii][-1]
expDimRef.refExpDimRef = refData[jj][-1]
expDimRef.measurementType = refData[jj][-1].expMeasurement.measurementType
del inData[ii]
del refData[jj]
break
#===========================================================================================================
# Peak-related stuff
#===========================================================================================================
def _createPeak(spectrum, peakList=None, height=None, **ppmPositions) -> Optional['Peak']:
"""Create peak at position specified by the ppmPositions dict.
Return the peak created at this ppm position or None.
Ppm positions are passed in as a dict of (axisCode, ppmValue) key, value pairs
with the ppmValue supplied mapping to the closest matching axis.
Illegal or non-matching axisCodes will return None.
Example ppmPosition dict:
::
{'Hn': 7.0, 'Nh': 110}
Example calling function:
>>> peak = spectrum.createPeak(**limitsDict)
>>> peak = spectrum.createPeak(peakList, **limitsDict)
>>> peak = spectrum.createPeak(peakList=peakList, Hn=7.0, Nh=110)
:param peakList: peakList to create new peak in, or None for the last peakList belonging to spectrum
:param ppmPositions: dict of (axisCode, ppmValue) key,value pairs
:return: new peak or None
"""
with undoBlockWithoutSideBar():
axisCodes = []
_ppmPositions = []
for axis, pos in ppmPositions.items():
axisCodes.append(axis)
_ppmPositions.append(float(pos))
try:
# try and match the axis codes before creating new peakList (if required)
indices = spectrum.getByAxisCodes('dimensionIndices', axisCodes)
except Exception as es:
getLogger().warning(f'Non-matching axis codes found {axisCodes}')
return
peakList = spectrum.project.getByPid(peakList) if isinstance(peakList, str) else peakList
if not peakList:
if spectrum.peakLists:
peakList = spectrum.peakLists[-1]
else:
# log warning that no peakList exists - this SHOULD never happen
getLogger().warning(f'Spectrum {spectrum} has no peakLists - creating new')
peakList = spectrum.newPeakList()
# should get all ppm's from the reference - shouldn't really be any Nones now though
_ppmPositions = [_ppmPositions[ii] for ii in indices]
if height is None:
height = spectrum.getHeight(_ppmPositions)
specLimits = spectrum.spectrumLimits
aliasInds = spectrum.aliasingIndexes
for dim, pos in enumerate(_ppmPositions):
# check that the picked peak lies in the bounded region of the spectrum
minSpectrumFrequency, maxSpectrumFrequency = sorted(specLimits[dim])
visibleAlias = aliasInds[dim]
regionBounds = (round(minSpectrumFrequency + visibleAlias[0] * (maxSpectrumFrequency - minSpectrumFrequency), 3),
round(minSpectrumFrequency + (visibleAlias[1] + 1) * (maxSpectrumFrequency - minSpectrumFrequency), 3))
if not (regionBounds[0] <= pos <= regionBounds[1]):
break
else:
# get the list of existing peak pointPositions in this peakList
pointCounts = spectrum.pointCounts
intPositions = [int(((spectrum.ppm2point(pos, dimension=indx + 1)) - 1) % np) + 1
for indx, (pos, np) in enumerate(zip(_ppmPositions, pointCounts))]
# get the existing peak point-positions for this list
existingPositions = [[int(pp) for pp in pk.pointPositions] for pk in peakList.peaks]
if intPositions not in existingPositions:
# add the new peak only if one doesn't exist at these pointPositions
pk = peakList.newPeak(ppmPositions=_ppmPositions, height=height)
return pk
# def _pickPeaks(spectrum, peakList=None, positiveThreshold=None, negativeThreshold=None, **ppmRegions) -> Tuple['Peak', ...]:
# """Pick peaks in the region defined by the ppmRegions dict.
#
# Ppm regions are passed in as a dict containing the axis codes and the required limits.
# Each limit is defined as a key, value pair: (str, tuple), with the tuple supplied as (min, max) axis limits in ppm.
# Axis codes supplied are mapped to the closest matching axis.
#
# Illegal or non-matching axisCodes will return None.
#
# Example ppmRegions dict:
#
# ::
#
# {'Hn': (7.0, 9.0),
# 'Nh': (110, 130)
# }
#
# Example calling function:
#
# >>> peaks = spectrum.pickPeaks(**regionsDict)
# >>> peaks = spectrum.pickPeaks(peakList, **regionsDict)
# >>> peaks = spectrum.pickPeaks(peakList=peakList, Hn=(7.0, 9.0), Nh=(110, 130))
#
# :param peakList: peakList to create new peak in, or None for the last peakList belonging to spectrum
# :param positiveThreshold: float or None specifying the positive threshold above which to find peaks.
# if None, positive peak picking is disabled.
# :param negativeThreshold: float or None specifying the negative threshold below which to find peaks.
# if None, negative peak picking is disabled.
# :param ppmRegions: dict of (axisCode, tupleValue) key, value pairs
# :return: tuple of new peaks
# """
# # local import as cycles otherwise
# from ccpn.core.Spectrum import Spectrum
#
# application = getApplication()
# preferences = application.preferences
# logger = getLogger()
#
# if spectrum is None or not isinstance(spectrum, Spectrum):
# raise ValueError('_pickPeaks: required Spectrum instance, got:%r' % spectrum)
#
# if peakList is None:
# peakList = spectrum.peakLists[-1]
#
#
# # get the dimensions by mapping the keys of the ppmRegions dict
# dimensions = spectrum.getByAxisCodes('dimensions', [a for a in ppmRegions.keys()])
# # now get all other parameters in dimensions order
# axisCodes = spectrum.getByDimensions('axisCodes', dimensions)
# ppmValues = [sorted(float(pos) for pos in region) for region in ppmRegions.values()]
# ppmValues = spectrum.orderByDimensions(ppmValues, dimensions) # sorted in order of dimensions
#
# axisDict = dict((axisCode, region) for axisCode, region in zip(axisCodes, ppmValues))
# sliceTuples = spectrum._axisDictToSliceTuples(axisDict)
#
# return _pickPeaksByRegion(spectrum, sliceTuples= sliceTuples, peakList=peakList,
# positiveThreshold=positiveThreshold, negativeThreshold=negativeThreshold)
def _pickPeaksByRegion(spectrum, sliceTuples, peakList, positiveThreshold, negativeThreshold) -> list:
"""Helper function to pick peaks of spectrum in region defined by sliceTuples
:param spectrum: a Spectrum instance
:param sliceTuples: a list of (startPoint,stopPoint) tuples (1-based, inclusive) per dimension
:param peakList: peakList to hold newly created peak.
:param positiveThreshold: float or None specifying the positive threshold above which to find peaks.
if None, positive peak picking is disabled.
:param negativeThreshold: float or None specifying the negative threshold below which to find peaks.
if None, negative peak picking is disabled.
:return: list of new peaks
"""
# local import as cycles otherwise
from ccpn.core.Spectrum import Spectrum
from ccpn.core.PeakList import PeakList
application = getApplication()
preferences = application.preferences
logger = getLogger()
if spectrum is None or not isinstance(spectrum, Spectrum):
raise ValueError('_pickPeaksByRegion: required Spectrum instance, got:%r' % spectrum)
if peakList is None or not isinstance(peakList, PeakList):
raise ValueError('_pickPeaksByRegion: required peakList instance, got:%r' % peakList)
# get the peakPicker
if (peakPicker := spectrum.peakPicker) is None:
txt = f'_pickPeakByRegion: No valid peakPicker for {spectrum}'
logger.warning(txt)
raise RuntimeError(txt)
# check the sliceTuples; make a copy as list of lists first
_sliceTuples = [list(sl) for sl in sliceTuples]
cropped = False
for dimIdx, _slice, points in zip(spectrum.dimensionIndices, _sliceTuples, spectrum.aliasingPointLimits):
# check if the region is not completely outside the spectrum;
# if so, issue a warning
if (_slice[0] < points[0] and _slice[1] < points[0]) or \
(_slice[0] > points[1] and _slice[1] > points[1]):
# region is fully outside the spectral range
logger.debug('_pickPeaksByRegion: %s: sliceTuples[%d]=%r out of range %r' %
(spectrum, dimIdx, tuple(_slice), points))
logger.warning(f'could not pick peaks for {spectrum} in region {sliceTuples}; '\
f'dimension {dimIdx+1},"{spectrum.axisCodes[dimIdx]}" = {tuple(_slice)} out of range {points}')
return []
if _slice[0] < points[0]:
_slice[0] = points[0]
_sliceTuples[dimIdx] = _slice
cropped = True
if _slice[1] > points[1]:
_slice[1] = points[1]
_sliceTuples[dimIdx] = _slice
cropped = True
# revert to a list of tuples
_sliceTuples = [tuple(sl) for sl in _sliceTuples]
if cropped:
logger.warning('_pickPeaksByRegion: %s: cropped sliceTuples from %r to %r' %
(spectrum, sliceTuples, _sliceTuples))
# set any additional parameters from preferences
minDropFactor = preferences.general.peakDropFactor
fitMethod = preferences.general.peakFittingMethod
peakPicker.setParameters(dropFactor=minDropFactor,
fitMethod=fitMethod,
setLineWidths=True
)
peaks = []
with undoBlockWithoutSideBar():
try:
peaks = peakPicker.pickPeaks(sliceTuples=_sliceTuples,
peakList=peakList,
positiveThreshold=positiveThreshold,
negativeThreshold=negativeThreshold
)
except Exception as err:
# need to trap error that Nd spectra may not be defined in all dimensions of axisDict
logger.debug('_pickPeaks %s, trapped error: %s' % (spectrum, str(err)))
logger.warning(f'could not pick peaks for {spectrum} in region {sliceTuples}')
# if application._isInDebugMode:
# raise err
return peaks
[docs]def fetchPeakPicker(spectrum):
"""Get a peakPicker; either by restore from spectrum or the default relevant for spectrum
:return a PeakPicker instance or None on errors
"""
from ccpn.util.traits.CcpNmrJson import CcpNmrJson
from ccpn.core.Spectrum import Spectrum
from ccpn.core.lib.PeakPickers.PeakPicker1D import PeakPicker1D
from ccpn.core.lib.PeakPickers.PeakPickerNd import PeakPickerNd
from ccpn.core.lib.PeakPickers.PeakPickerABC import getPeakPickerTypes, PEAKPICKERPARAMETERS
if spectrum is None:
raise ValueError('fetchPeakPicker: spectrum is None')
if not isinstance(spectrum, Spectrum):
raise ValueError('fetchPeakPicker: spectrum is not of Spectrum class')
project = spectrum.project
application = project.application
preferences = application.preferences
peakPickers = getPeakPickerTypes()
default1DPickerType = preferences.general.peakPicker1d
if default1DPickerType is None or len(default1DPickerType) == 0 or default1DPickerType not in peakPickers:
default1DPickerType = PeakPicker1D.peakPickerType
defaultNDPickerType = preferences.general.peakPickerNd
if defaultNDPickerType is None or len(defaultNDPickerType) == 0 or defaultNDPickerType not in peakPickers:
defaultNDPickerType = PeakPickerNd.peakPickerType
_picker = None
try:
# read peakPicker from CCPNinternal
jsonString = spectrum._getInternalParameter(PEAKPICKERPARAMETERS)
_picker = CcpNmrJson.newObjectFromJson(jsonString=jsonString, spectrum=spectrum)
except Exception as es:
# No internal definition found
_pickerType = default1DPickerType if spectrum.dimensionCount == 1 else defaultNDPickerType
getLogger().debug(f'peakPicker not restored from {spectrum}; selected {_pickerType} instead')
if (cls := peakPickers.get(_pickerType)) is not None:
_picker = cls(spectrum=spectrum)
else:
getLogger().debug(f'Failed to initiate {_pickerType} peakPicker')
if _picker is None:
getLogger().debug(f'peakPicker for {spectrum} not defined')
return _picker
#===========================================================================================================
# Spectrum axis permutations
#===========================================================================================================
def _searchAxisCodePermutations(spectrum, checkCodes: Tuple[str, ...]) -> Optional[Tuple[int]]:
"""Generate the permutations of the current axisCodes
"""
if not checkCodes:
raise ValueError('checkCodes is not defined')
if not isinstance(checkCodes, (tuple, list)):
raise TypeError('checkCodes is not a list/tuple')
if not all(isinstance(ss, str) for ss in checkCodes):
raise TypeError('checkCodes elements must be strings')
from itertools import permutations
# add permutations for the axes
axisPerms = tuple(permutations(spectrum.axisCodes))
axisOrder = tuple(permutations(spectrum.dimensionIndices))
for ii, perm in enumerate(axisPerms):
n = min(len(checkCodes), len(perm))
if n and all(pCode[0] == cCode[0] for pCode, cCode in zip(perm[:n], checkCodes[:n])):
return axisOrder[ii]
def _setDefaultAxisOrdering(spectrum):
"""Establish and set a preferred axis ordering, based on some default rules;
e.g. HCN for triple-resonance experiment
called once from _newSpectrum to set the preferredAxisOrdering
"""
pOrder = None
# Define the preferred orderings
if spectrum.dimensionCount == 2:
dCodes = ['H N'.split(), 'H C'.split(), ('H',)]
elif spectrum.dimensionCount == 3:
dCodes = ['H C N'.split(), 'H H'.split(), 'C C'. split(), 'N N'.split()]
elif spectrum.dimensionCount == 4:
dCodes = ['H C H N'.split(), 'H C H C'.split()]
else:
dCodes = []
# See if we can map one of the preferred orderings
for dCode in dCodes:
pOrder = _searchAxisCodePermutations(spectrum, dCode)
if pOrder and pOrder[0] == X_DIM_INDEX:
spectrum._preferredAxisOrdering = pOrder
break
if not pOrder:
# didn't find anything; revert to default [0...dimensionCount-1]
pOrder = spectrum.dimensionIndices
return
#===========================================================================================================
# Spectrum/Peak parameter management
#===========================================================================================================
def _setParameterValues(obj, parameterName:str, values:Sequence, dimensions:Sequence, dimensionCount:int) -> list:
"""A helper function to reduce code overhead in setting parameters of Spectrum and Peak
:return The list with values
CCPNINTERNAL: used in setByAxisCode and setByDimension methods of
Spectrum and Peak classes
"""
if not hasattr(obj, parameterName):
raise ValueError('object "%s" does not have parameter "%s"' %
(obj.__class__.__name__, parameterName))
if not isIterable(values):
raise ValueError('setting "%s.%s" requires "values" tuple or list; got %r' %
(obj.__class__.__name__, parameterName, values))
if not isIterable(dimensions):
raise ValueError('setting "%s.%s" requires "dimensionIndices" tuple or list; got %r' %
(obj.__class__.__name__, parameterName, dimensions))
if len(values) != len(dimensions):
raise ValueError('setting "%s.%s": unequal length of "values" and "dimensionIndices"; got %r and %r' %
(obj.__class__.__name__, parameterName, values, dimensions))
newValues = list(getattr(obj, parameterName))
for dim, val in zip(dimensions, values):
if dim < 1 or dim > dimensionCount:
# report error in 1-based, as the error is caught by the calling routines
raise ValueError('%s: invalid dimension "%s"; should be in range (1,%d)' %
(obj, dim, dimensionCount))
newValues[dim-1] = val
try:
setattr(obj, parameterName, newValues)
except AttributeError:
raise ValueError('setting "%s.%s": unable to set to %r' %
(obj.__class__.__name__, parameterName, newValues))
# we get the values from the obj, just in case some haven been modified
return getattr(obj, parameterName)
def _getParameterValues(obj, parameterName:str, dimensions:Sequence, dimensionCount:int) -> list:
"""A helper function to reduce code overhead in setting parameters of Spectrum and Peak
:return The list with values
CCPNINTERNAL: used in getByAxisCode and getByDimension methods of
Spectrum and Peak classes
"""
if not hasattr(obj, parameterName):
raise ValueError('object "%s" does not have parameter "%s"' %
(obj.__class__.__name__, parameterName))
if not isIterable(dimensions):
raise ValueError('getting "%s.%s" requires "dimensions" tuple or list; got %r' %
(obj.__class__.__name__, parameterName, dimensions))
try:
values = getattr(obj, parameterName)
except AttributeError:
raise ValueError('%s: unable to get parameter "%s"' % (obj, parameterName))
newValues = []
if all(isinstance(i, tuple) for i in values):
# this could be the case as in peak.assignedNmrAtoms
for ll in values:
_newValuesForDim = []
for dim in dimensions:
_newValuesForDim.append(ll[dim - 1])
newValues.append(tuple(_newValuesForDim))
return newValues
newValues = []
for dim in dimensions:
if dim < 1 or dim > dimensionCount:
# report error in 1-based, as the error is caught by the calling routines
raise ValueError('%s: invalid dimension "%s"; should be in range (1,%d)' %
(obj, dim, dimensionCount))
newValues.append(values[dim-1])
return newValues
def _orderByDimensions(iterable, dimensions, dimensionCount) -> list:
"""Return a list of values of iterable in order defined by dimensions (default order if None).
:param iterable: an iterable (tuple, list)
:param dimensions: a tuple or list of dimensions (1..dimensionCount)
:return: a list with values defined by iterable in dimensions order
"""
if not isIterable(iterable):
raise ValueError('not an iterable; got %r' % (iterable))
values = list(iterable)
if not isIterable(dimensions):
raise ValueError('"dimensions" is not iterable; got %r' % (dimensions))
result = []
for dim in dimensions:
if dim <1 or dim > dimensionCount:
raise ValueError('invalid dimension "%s"; should be in range (1,%d)' %
(dim, dimensionCount))
if dim-1 >= len(values):
raise ValueError('invalid dimension "%s"; to large for iterable (%r)' %
(dim, values))
result.append( values[dim-1] )
return result
#===========================================================================================================
# GWV testing only
#===========================================================================================================
from ccpn.util.traits.CcpNmrTraits import List, Int, Float, TraitError
[docs]class SpectrumDimensionTrait(List):
"""
A trait to implement a Spectrum dimensional attribute; e.g. like spectrumFrequencies
"""
# GWV test
# _spectrometerFrequencies = SpectrumDimensionTrait(trait=Float(min=0.0)).tag(
# attributeName='spectrometerFrequency',
# doCopy = True
# )
isDimensional = True
[docs] def validate(self, obj, value):
"""Validate the value
"""
if len(value) != obj.dimensionCount:
raise TraitError('Setting "%s", invalid value "%s"' % (self.name, value))
value = self.validate_elements(obj, value)
return value
def _getValue(self, obj):
"""Get the value of trait, obtained from the obj (i.e.spectrum) dimensions
"""
if (dimensionAttributeName := self.get_metadata('attributeName', None)) is None:
raise RuntimeError('Undefined dimensional attributeName for trait %r' % self.name)
value = [getattr(specDim, dimensionAttributeName) for specDim in obj.spectrumReferences]
return value
[docs] def get(self, obj, cls=None):
try:
value = self._getValue(obj)
except (AttributeError, ValueError, RuntimeError):
# Check for a dynamic initializer.
dynamic_default = self._dynamic_default_callable(obj)
if dynamic_default is None:
raise TraitError("No default value found for %s trait of %r"
% (self.name, obj))
value = self._validate(obj, dynamic_default())
obj._trait_values[self.name] = value
return value
except Exception:
# This should never be reached.
raise TraitError('Unexpected error in DimensionTrait')
else:
self._obj = obj # last obj used for get
return value
def _setValue(self, obj, value):
"""Set the value of trait, stored in the obj (i.e.spectrum) dimensions
"""
if (dimensionAttributeName := self.get_metadata('attributeName', None)) is None:
raise RuntimeError('Undefined dimensional attributeName for trait %r' % self.name)
for axis, val in enumerate(value):
setattr(obj.spectrumReferences[axis], dimensionAttributeName, val)
[docs] def set(self, obj, value):
new_value = self._validate(obj, value)
try:
old_value = self._getValue(obj)
except (AttributeError, ValueError, RuntimeError):
old_value = self.default_value
# obj._trait_values[self.name] = new_value
self._setValue(obj, new_value)
try:
silent = bool(old_value == new_value)
except:
# if there is an error in comparing, default to notify
silent = False
if silent is not True:
# we explicitly compare silent to True just in case the equality
# comparison above returns something other than True/False
obj._notify_trait(self.name, old_value, new_value)