Source code for ccpn.core.lib.peakUtils

#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See https://ccpn.ac.uk/software/licensing/")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2022-04-04 11:31:49 +0100 (Mon, April 04, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2017-04-07 10:28:41 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================

from typing import Sequence, Union
import numpy as np
from ccpn.util.Logging import getLogger
from collections import OrderedDict
from scipy.optimize import curve_fit
from ccpn.core.PeakList import GAUSSIANMETHOD, PARABOLICMETHOD
from ccpn.core.lib.ContextManagers import newObject, undoBlock, undoBlockWithoutSideBar, notificationEchoBlocking
import pandas as pd
from pandas import MultiIndex as m_ix
from ccpn.util.Common import makeIterableList, percentage


# This variables will be moved to SeriesAnalysisVarialbles.py
_POSITION = 'position'
_POINTPOSITION  = 'pointPosition'
_PPMPOSITION    = 'ppmPosition'
_LINEWIDTH      = 'lineWidth'
HEIGHT = 'height'
VOLUME = 'volume'
POSITIONS =  f'{_POSITION}s'
LINEWIDTHS = f'{_LINEWIDTH}s'
POINTPOSITIONS = f'{_POINTPOSITION}s'
PPMPOSITIONS = f'{_PPMPOSITION}s'
RAW = 'raw'
DELTAS = 'deltas'
DELTA = '\u0394'
Delta = '\u03B4'
MODES = [POSITIONS, HEIGHT, VOLUME, LINEWIDTHS]
DISPLAYDATA = [DELTA + Delta, RAW]
OTHER = 'Other'
H = 'H'
N = 'N'
C = 'C'
DefaultAtomWeights = OrderedDict(((H, 7.00), (N, 1.00), (C, 4.00), (OTHER, 1.00)))
NR_ID = 'NR_ID'


[docs]class Dictlist(dict): def __setitem__(self, key, value): try: self[key] except KeyError: super(Dictlist, self).__setitem__(key, []) self[key].append(value)
[docs]def getMultipletPosition(multiplet, dim, unit='ppm'): try: # skip if the position is None, otherwise check for dimensions if multiplet.position is None: return if multiplet.position[dim] is None: value = None #"*NOT SET*" elif unit == 'ppm': value = multiplet.position[dim] # NOT implemented for multiplets # elif unit == 'point': # value = multiplet.pointPositions[dim] elif unit == 'Hz': value = multiplet.position[dim] * multiplet.multipletList.spectrum.spectrometerFrequencies[dim] else: # sampled raise ValueError("Unit passed to getPeakPosition must be 'ppm', 'point', or 'Hz', was %s" % unit) if not value: return if isinstance(value, (int, float, np.float32, np.float64)): return float(value) # '{0:.2f}'.format(value) except Exception as e: getLogger().warning('Error on setting Position. %s' % e)
# return None
[docs]def getMultipletLinewidth(peak, dim): if dim < len(peak.lineWidths): lw = peak.lineWidths[dim] if lw: return float(lw)
[docs]def getPeakPosition(peak, dim, unit='ppm'): if len(peak.dimensionNmrAtoms) > dim: # peakDim = peak.position[dim] if peak.position[dim] is None: value = None #"*NOT SET*" elif unit == 'ppm': value = peak.position[dim] elif unit == 'point': value = peak.pointPositions[dim] elif unit == 'Hz': value = peak.position[dim] * peak.peakList.spectrum.spectrometerFrequencies[dim] else: # sampled raise ValueError("Unit passed to getPeakPosition must be 'ppm', 'point', or 'Hz', was %s" % unit) if isinstance(value, (int, float, np.float32, np.float64)): return float(value) # '{0:.4f}'.format(value) return None
# if isinstance(value, [int, float]): # # if type(value) is int or type(value) in (float, float32, float64): # return '%7.2f' % float(value)
[docs]def getPeakAnnotation(peak, dim, separator=', '): if len(peak.dimensionNmrAtoms) > dim: return separator.join([dna.pid.id for dna in peak.dimensionNmrAtoms[dim] if not (dna.isDeleted or dna._flaggedForDelete)])
[docs]def getPeakLinewidth(peak, dim): """Return the lineWidth in dimension 'dim' for the peakTable entries """ lineWidths = peak.lineWidths if lineWidths and dim < len(lineWidths): lw = peak.lineWidths[dim] if lw is not None: return float(lw) # need to return this as a string otherwise the table changes between 'None' and 'nan' return 'None'
def _pairIntersectionPoints(intersectionPoints): """ Yield successive pair chunks from list of intersectionPoints """ for i in range(0, len(intersectionPoints), 2): pair = intersectionPoints[i:i + 2] if len(pair) == 2: yield pair def _getIntersectionPoints(x, y, line): """ :param line: x points of line to intersect y points :return: list of intersecting points """ z = y - line dx = x[1:] - x[:-1] cross = np.sign(z[:-1] * z[1:]) x_intersect = x[:-1] - dx / (z[1:] - z[:-1]) * z[:-1] negatives = np.where(cross < 0) points = x_intersect[negatives] return points
[docs]def estimate1DPeakFWHM(peak): """ Estimate the 1D peak Full Width at Half Maximum. Returns ------- width: The width for the peak in points. widthHeight: The height of the contour lines at which the `width` was evaluated. limits: tuple, Interpolated positions of left and right intersection points of a horizontal line at the respective evaluation height. """ from scipy.signal import peak_widths from ccpn.core.Peak import Peak import numpy as np if not isinstance(peak, Peak): return [None, None, (None, None)] if peak.spectrum.dimensionCount > 1: return [None, None, (None, None)] y = peak.spectrum.intensities pointPositions = np.array([int(peak.pointPosition[0])]) widths, widthHeight, *limits = peak_widths(y, pointPositions, rel_height=0.5) limits = tuple(zip(*limits)) if widths and len(widths)>0: return widths[0], widthHeight[0], limits[0] return [None, None, (None, None)]
def _getAtomWeight(axisCode, atomWeights) -> float or int: """ :param axisCode: str of peak axis code :param atomWeights: dictionary of atomWeights eg {'H': 7.00, 'N': 1.00, 'C': 4.00, 'Other': 1.00} :return: float or int from dict atomWeights """ value = 1.0 if len(axisCode) > 0: firstLetterAxisCode = axisCode[0] if firstLetterAxisCode in atomWeights: value = atomWeights[firstLetterAxisCode] else: if OTHER in atomWeights: if atomWeights[OTHER] != 1: value = atomWeights[OTHER] else: value = 1.0 return value def _traverse(o, tree_types=(list, tuple)): """used to flat the state in a long list """ if isinstance(o, tree_types): for value in o: for subvalue in _traverse(value, tree_types): yield subvalue else: yield o def _getPeaksForNmrResidueByNmrAtomNames(nmrResidue, nmrAtomsNames, spectra): peaks = [] nmrAtoms = [] peakLists = [sp.peakLists[-1] if len(sp.peakLists) > 0 else getLogger().warning('No PeakList for %s' % sp) for sp in spectra] # take only the last peakList if more then 1 for nmrAtomName in nmrAtomsNames: nmrAtom = nmrResidue.getNmrAtom(str(nmrAtomName)) if nmrAtom is not None: nmrAtoms.append(nmrAtom) filteredPeaks = [] nmrAtomsNamesAvailable = [] for nmrAtom in nmrAtoms: for peak in nmrAtom.assignedPeaks: if peak.peakList.spectrum in spectra: if nmrAtom.name in nmrAtomsNames: if peak.peakList in peakLists: filteredPeaks.append(peak) nmrAtomsNamesAvailable.append(nmrAtom.name) if len(list(set(filteredPeaks))) == len(spectra): # deals when a residue is assigned to multiple peaks if len(list(set(nmrAtomsNamesAvailable))) == len(nmrAtomsNames): peaks += filteredPeaks else: for peak in filteredPeaks: assignedNmrAtoms = _traverse(peak.assignedNmrAtoms) if all(assignedNmrAtoms): assignedNmrAtoms = [na.name for na in assignedNmrAtoms] if len(assignedNmrAtoms) > 1: if list(assignedNmrAtoms) == nmrAtomsNames: peaks += [peak] if len(nmrAtomsNames) == 1: if nmrAtomsNames[0] in assignedNmrAtoms: peaks += [peak] return list(OrderedDict.fromkeys(peaks))
[docs]def getNmrResiduePeakProperty(nmrResidue, nmrAtomsNames, spectra, theProperty='height'): """ :param nmrResidue: :param nmrAtomsNames: nmr Atoms to compare. str 'H', 'N', 'CA' etc :param spectra: compare peaks only from given spectra :param theProperty: 'height' or 'volume' :return: """ ll = [] if len(spectra) <= 1: return if not theProperty in ['height', 'volume']: getLogger().warning('Property not currently available %s' % theProperty) return peaks = _getPeaksForNmrResidueByNmrAtomNames(nmrResidue, nmrAtomsNames, spectra) if len(peaks) > 0: for peak in peaks: if peak.peakList.spectrum in spectra: p = getattr(peak, theProperty) ll.append(p) return ll, peaks
[docs]def getNmrResiduePeakHeight(nmrResidue, nmrAtomsNames, spectra): """ :param nmrResidue: :param nmrAtomsNames: nmr Atoms to compare. str 'H', 'N', 'CA' etc :param spectra: compare peaks only from given spectra :return: """ getLogger().warning('Deprecated. Used getNmrResiduePeakProperty with theProperty = "height"') return getNmrResiduePeakProperty(nmrResidue, nmrAtomsNames, spectra, theProperty='height')
[docs]def getRawDataFrame(nmrResidues, nmrAtomsNames, spectra, theProperty): dfs = [] names = [sp.name for sp in spectra] for nmrResidue in nmrResidues: if not '-' in nmrResidue.sequenceCode.replace('+', '-'): # not consider the +1 and -1 residues ll = getNmrResiduePeakProperty(nmrResidue, nmrAtomsNames, spectra, theProperty) if len(ll) > 0: df = pd.DataFrame([ll], index=[nmrResidue.pid], columns=names) dfs.append(df) data = pd.concat(dfs) return data
def _getPeaksForNmrResidue(nmrResidue, nmrAtomsNames, spectra): if len(spectra) <= 1: return _peaks = _getPeaksForNmrResidueByNmrAtomNames(nmrResidue, nmrAtomsNames, spectra) usepeaks = [] if len(_peaks) > 0: for peak in _peaks: if peak.peakList.spectrum in spectra: usepeaks.append(peak) return usepeaks
[docs]def getNmrResidueDeltas(nmrResidue, nmrAtomsNames, spectra, mode=POSITIONS, atomWeights=None): """ :param nmrResidue: :param nmrAtomsNames: nmr Atoms to compare. str 'H', 'N', 'CA' etc :param spectra: compare peaks only from given spectra :return: """ deltas = [] if len(spectra) <= 1: return peaks = _getPeaksForNmrResidueByNmrAtomNames(nmrResidue, nmrAtomsNames, spectra) if atomWeights is None: atomWeights = DefaultAtomWeights if len(peaks) > 0: for peak in peaks: if peak.peakList.spectrum in spectra: # try: #some None value can get in here if mode == POSITIONS: deltaTemp = [] for i, axisCode in enumerate(peak.axisCodes): if len(axisCode) > 0: if any(s.startswith(axisCode[0]) for s in nmrAtomsNames): weight = _getAtomWeight(axisCode, atomWeights) if peaks[0] != peak: # dont' compare to same peak delta = peak.position[i] - peaks[0].position[i] delta = delta ** 2 delta = delta * weight deltaTemp.append(delta) # deltaInts.append(((peak.position[i] - list(peaks)[0].position[i]) * weight) ** 2) # delta += ((list(peaks)[0].position[i] - peak.position[i]) * weight) ** 2 if len(deltaTemp) > 0: delta = sum(deltaTemp) ** 0.5 deltas += [delta] if mode == VOLUME: if list(peaks)[0] != peak: # dont' compare to same peak if not peak.volume or not peaks[0].volume or peaks[0].volume == 0: getLogger().warning('Volume has to be set for peaks: %s, %s' % (peak, peaks[0])) break delta1Atoms = (peak.volume / list(peaks)[0].volume) deltas += [((delta1Atoms) ** 2) ** 0.5, ] if mode == HEIGHT: if list(peaks)[0] != peak: # dont' compare to same peak if not peak.height or not peaks[0].height or peaks[0].height == 0: getLogger().warning('Height has to be set for peaks: %s, %s' % (peak, peaks[0])) break delta1Atoms = (peak.height / list(peaks)[0].height) deltas += [((delta1Atoms) ** 2) ** 0.5, ] if mode == LINEWIDTHS: deltaTemp = [] for i, axisCode in enumerate(peak.axisCodes): if list(peaks)[0] != peak: # dont' compare to same peak if axisCode: if len(axisCode) > 0: if any(s.startswith(axisCode[0]) for s in nmrAtomsNames): weight = _getAtomWeight(axisCode, atomWeights) if not peak.lineWidths[i] or not peaks[0].lineWidths[i]: getLogger().warning('lineWidth has to be set for peaks: %s, %s' % (peak, peaks[0])) break delta = ((peak.lineWidths[i] - list(peaks)[0].lineWidths[i]) * weight) ** 2 deltaTemp.append(delta) if len(deltaTemp) > 0: delta = sum(deltaTemp) ** 0.5 deltas += [delta] # except Exception as e: # message = 'Error for calculation mode: %s on %s and %s. ' % (mode, peak.pid, list(peaks)[0].pid) + str(e) # getLogger().debug(message) if deltas and not None in deltas: return round(float(np.mean(deltas)), 3) return
def _getKd(func, x, y): if len(x) <= 1: return try: param = curve_fit(func, x, y, maxfev=6000) bindingUnscaled, bmax = param[0] yScaled = y / bmax paramScaled = curve_fit(func, x, yScaled, maxfev=6000) kd, bmax = paramScaled[0] except Exception as err: getLogger().warning('Impossible to estimate Kd values. %s' % err) kd, bmax = [None, None] return kd
[docs]def oneSiteBindingCurve(x, kd, bmax): return (bmax * x) / (x + kd)
[docs]def exponenial_func(x, a, b): return a * np.exp(-b * x)
def _fit1SiteBindCurve(bindingCurves, aFunc=oneSiteBindingCurve, xfStep=0.01, xfPercent=30): """ :param bindingCurves: DataFrame as: Columns -> float or int. Used as xs points (e.g. concentration/time/etc value) rows -> float or int. Used as ys points (e.g. Deltadelta in ppm) the actual curve points index -> obj. E.g. nmrResidue obj. used as identifier for the curve origin | index | 1 | 2 | |----------------+--------|-------| | obj1 | 1.0 | 1.1 | | obj2 | 2.0 | 1.2 | :param aFunc: Default: oneSiteBindingCurve. :param xfStep: number of x points for generating the fitted curve. :param xfPercent: percent to add to max X value of the fitted curve, so to have a longer curve after the last experimental value. :return: tuple of parameters for plotting fitted curves. x_atHalf_Y: the x value for half of Y. Used as estimated kd xs: array of xs. Original xs points from the dataFrame columns yScaled: array of yScaled. Scaled to have values 0 to 1 xf: array of x point for the new fitted curve. A range from 0 to max of xs. yf: array the fitted curve """ from scipy.optimize import curve_fit from ccpn.util.Common import percentage errorValue = (None,) * 6 if aFunc is None or not callable(aFunc): getLogger().warning("Error. Fitting curve %s is not callable" % aFunc) return errorValue if bindingCurves is None: getLogger().warning("Error. Binding curves not found") return errorValue data = bindingCurves.replace(np.nan, 0) ys = data.values.flatten(order='F') #puts all y values in a single 1d array. xss = np.array([data.columns] * data.shape[0]) xs = xss.flatten(order='F') # #puts all x values in a 1d array preserving the original y positions (order='F'). # print(( xs, ys), '$$$') if len(xs) <= 1: return errorValue #not enough datapoints try: param = curve_fit(aFunc, xs, ys) xhalfUnscaled, bMaxUnscaled = param[0] yScaled = ys / bMaxUnscaled #scales y to have values 0-1 paramScaled = curve_fit(aFunc, xs, yScaled) xfRange = np.max(xs) - np.min(xs) xfPerc = percentage(xfPercent, xfRange) xfMax = np.max(xs) + xfPerc xf = np.arange(0, xfMax, step=xfStep) yf = aFunc(xf, *paramScaled[0]) x_atHalf_Y, bmax = paramScaled[0] return (x_atHalf_Y, bmax, xs, yScaled, xf, yf) except Exception as err: getLogger().warning('Impossible to estimate Kd value %s' % (err)) return errorValue def _fitExpDecayCurve(bindingCurves, aFunc=exponenial_func, xfStep=0.01, xfPercent=80, p0=(1, 0.1)): """ :param TODO """ errorValue = (None,) * 6 if aFunc is None or not callable(aFunc): getLogger().warning("Error. Fitting curve %s is not callable" % aFunc) return errorValue if bindingCurves is None: getLogger().warning("Error. Binding curves not found") return errorValue data = bindingCurves.replace(np.nan, 0) ys = data.values.flatten(order='F') #puts all y values in a single 1d array. xss = np.array([data.columns] * data.shape[0]) xs = xss.flatten(order='F') # #puts all x values in a 1d array preserving the original y positions (order='F'). if len(xs) <= 1: return errorValue #not enough datapoints # try: popt, pcov = curve_fit(aFunc, xs, ys, p0=p0) interc, slope = popt # yScaled = ys / interc # scales y to have values 0-1 # poptScaled, pcov = curve_fit(aFunc, xs, yScaled) yScaled = (ys - np.min(ys)) / (np.max(ys) - np.min(ys)) yScaled[np.isnan(yScaled)] = 0 # cannot fit nans popt, pcov = curve_fit(exponenial_func, xs, yScaled, p0=popt) interc, slope = popt xfRange = np.max(xs) - np.min(xs) xfPerc = percentage(xfPercent, xfRange) xfMax = np.max(xs) + xfPerc xf = np.arange(0, xfMax, step=xfStep) yf = aFunc(xf, *popt) return (xs, yScaled, xf, yf, *popt) # except Exception as err: # getLogger().warning('Impossible to estimate Kd value %s' % (err)) # return errorValue
[docs]def snapToExtremum(peak: 'Peak', halfBoxSearchWidth: int = 3, halfBoxFitWidth: int = 3, minDropFactor: float = 0.1, fitMethod: str = PARABOLICMETHOD, searchBoxMode=False, searchBoxDoFit=False): """Snap the position of the peak the nearest extremum. Assumes called with an existing peak, will fit within a box ±halfBoxSearchWidth about the current peak position. """ from ccpn.framework.Application import getApplication spectrum = peak.spectrum numDim = spectrum.dimensionCount getApp = getApplication() if numDim == 1: # do the fit for 1D here doNeg = getApp.preferences.general.negativePeakPick1D _snap1DPeakToClosestExtremum(peak, maximumLimit=0.1, doNeg=doNeg) else: from ccpn.core.lib.PeakPickers.PeakPickerNd import PeakPickerNd spectrum = peak.spectrum myPeakPicker = PeakPickerNd(spectrum=spectrum) myPeakPicker.setParameters(dropFactor=minDropFactor, fitMethod=fitMethod, searchBoxMode=searchBoxMode, searchBoxDoFit=searchBoxDoFit, halfBoxSearchWidth=halfBoxSearchWidth, halfBoxFitWidth=halfBoxFitWidth, searchBoxWidths=getApp.preferences.general.searchBoxWidthsNd, ) myPeakPicker.snapToExtremum(peak)
[docs]def peakParabolicInterpolation(peak: 'Peak', update=False): """ return a (position, height, heightError) tuple using parabolic interpolation of the peak.position :param peak: a core.Peak instance or Pid or valid pid-string :param update: flag indicating peak position and height to be updated :return: (position, height) tuple; position is a list with length spectrum.dimensionCount """ import numpy from ccpn.core.Peak import Peak from ccpn.core.lib.Pid import Pid from ccpn.framework.Application import getApplication from ccpn.util.Parabole import Parabole if isinstance(peak, Peak): pass elif isinstance(peak, (Pid, str)): _peak = getApplication().project.getByPid(peak) if _peak is None: raise ValueError('Error retrieving valid peak instance from "%s"' % peak) peak = _peak else: raise ValueError('Expected a Peak, Pid or valid pid-string for the "peak" argument') spectrum = peak.peakList.spectrum spectrum.checkValidPath() # get the position as the nearest grid point position = [int(p + 0.5) for p in peak.pointPositions] # # get the data +/-1 point along each axis # sliceTuples = [(p - 1, p + 1) for p in position] # nb: sliceTuples run [1,n] with n inclusive _valuesPerPoint = spectrum.ppmPerPoints _axisCodes = spectrum.axisCodes _regionData = {axisCode: (ppm - valPerPoint / 2, ppm + valPerPoint / 2,) for axisCode, ppm, valPerPoint in zip(_axisCodes, peak.ppmPositions, _valuesPerPoint)} data = spectrum.getRegion(**_regionData) heights = [0.0] * spectrum.dimensionCount newPosition = position[:] for axis in range(spectrum.dimensionCount): # get the three y-values along axis, but centered for the other axes slices = [slice(1, 2) for d in range(spectrum.dimensionCount)] slices[axis] = slice(0, 3) yValues = data[tuple(slices[::-1])].flatten() # create points list for the Parabole method point = position[axis] points = [(p, yValues[i]) for i, p in enumerate((point - 1, point, point + 1))] parabole = Parabole.fromPoints(points) newPosition[axis], heights[axis] = parabole.maxValue() arr = numpy.array(heights) height = float(numpy.average(arr)) heightError = np.max(arr) - np.min(arr) if update: peak.pointPositions = newPosition peak.height = height peak.heightError = heightError return newPosition, height, heightError
[docs]def estimateVolumes(peaks: Sequence[Union[str, 'Peak']], volumeIntegralLimit=2.0): """Estimate the volumes for the peaks :param peaks: list of peaks as pids or strings :param volumeIntegralLimit: integral width as a multiple of lineWidth (FWHM) """ # move to peakList from ccpn.core.Peak import Peak from ccpn.framework.Application import getApplication if not getApplication() and not getApplication().project: raise RuntimeError('There is no project') getByPid = getApplication().project.getByPid # error checking - that the peaks are valid peakList = makeIterableList(peaks) pks = [getByPid(peak) if isinstance(peak, str) else peak for peak in peakList] for pp in pks: if not isinstance(pp, Peak): raise TypeError('%s is not of type Peak' % str(pp)) # estimate the volume for each peak for pp in pks: height = pp.height lineWidths = pp.lineWidths if lineWidths and None not in lineWidths and height: pp.estimateVolume(volumeIntegralLimit=volumeIntegralLimit) else: getLogger().warning('Peak %s contains undefined height/lineWidths' % str(pp))
[docs]def movePeak(peak, ppmPositions, updateHeight=True): """Move a peak based on it's delta shift and optionally update to the height at the new position """ with undoBlockWithoutSideBar(): peak.position = ppmPositions if updateHeight: # get the interpolated height at this position peak.height = peak.peakList.spectrum.getHeight(ppmPositions)
[docs]def updateHeight(peak): with undoBlockWithoutSideBar(): peak.height = peak.peakList.spectrum.getHeight(peak.position)
# added for pipelines def _1Dregions(x, y, value, lim=0.01): # centre of position, peak position # lim in ppm where to look left and right referenceRegion = [value - lim, value + lim] point1, point2 = np.max(referenceRegion), np.min(referenceRegion) x_filtered = np.where((x <= point1) & (x >= point2)) # only the region of interest for the reference spectrum y_filtered = y[x_filtered] x_filtered = x[x_filtered] return x_filtered, y_filtered def _1DregionsFromLimits(x, y, limits): # centre of position, peak position # lim in ppm where to look left and right point1, point2 = np.max(limits), np.min(limits) x_filtered = np.where((x <= point1) & (x >= point2)) # only the region of interest for the reference spectrum y_filtered = y[x_filtered] x_filtered = x[x_filtered] return x_filtered, y_filtered def _snap1DPeaksToExtremaSimple(peaks, limit=0.003): for peak in peaks: # peaks can be from diff peakLists if peak is not None: x = peak.peakList.spectrum.positions y = peak.peakList.spectrum.intensities x_filtered, y_filtered = _1Dregions(x, y, peak.position[0], lim=limit) if len(y_filtered) > 0: idx = y_filtered.argmax() peakHeight = y_filtered[idx] peakPos = x[x_filtered[0][idx]] # ppm positions peak.height = float(peakHeight) peak.position = [float(peakPos), ]
[docs]def find_nearest(array, value): array = np.asarray(array) idx = (np.abs(array - value)).argmin() return array[idx]
[docs]def snap1DPeaksToExtrema(peaks, maximumLimit=0.1, figOfMeritLimit=1, doNeg=True): with undoBlockWithoutSideBar(): with notificationEchoBlocking(): if len(peaks) > 0: peaks.sort(key=lambda x: x.position[0], reverse=False) # reorder peaks by position for peak in peaks: # peaks can be from diff peakLists if peak is not None: _snap1DPeakToClosestExtremum(peak, maximumLimit=maximumLimit, figOfMeritLimit=figOfMeritLimit, doNeg=doNeg)
[docs]def recalculatePeaksHeightAtPosition(peaks): with undoBlockWithoutSideBar(): with notificationEchoBlocking(): if len(peaks) > 0: for peak in peaks: # peaks can be from diff peakLists if peak is not None: height = peak.peakList.spectrum.getHeight(peak.position) peak.height = height
def _fitBins(y, bins): # fit a gauss curve over the bins mu = np.mean(y) sigma = np.std(y) fittedCurve = 1 / (sigma * np.sqrt(2 * np.pi)) * np.exp(-(bins - mu) ** 2 / (2 * sigma ** 2)) return fittedCurve def _getBins(y, binCount=None): """ :param y: :return: ### plot example: # import matplotlib.pyplot as plt # to plot # plt.hist(y, bins=int(len(y)/2), density=True) # plt.plot(edges, fittedCurve, linewidth=2, color='r') # plt.show() """ from scipy.stats import binned_statistic binCount = binCount or int(len(y) / 2) statistics, edges, binNumbers = binned_statistic(y, y, bins=binCount, statistic='mean') mostCommonBinNumber = np.argmax(np.bincount(binNumbers)) highestValues = y[binNumbers == mostCommonBinNumber] # values corresponding to most frequent binNumber fittedCurve = _fitBins(y, edges) fittedCurveExtremum = edges[np.argmax(fittedCurve)] # value at the Extremum of the fitted curve return statistics, edges, binNumbers, fittedCurve, mostCommonBinNumber, highestValues, fittedCurveExtremum
[docs]def snap1DPeaksAndRereferenceSpectrum(peaks, maximumLimit=0.1, useAdjacientPeaksAsLimits=False, doNeg=False, figOfMeritLimit=1, spectrum=None, autoRereferenceSpectrum=False): """ Snap all peaks to closest maxima Steps: - reorder the peaks by heights to give higher peaks priority to the snap - 1st iteration: search for nearest maxima and calculate delta positions (don't set peak.position here yet) - use deltas to fit patterns of shifts and detect the most probable global shift - use the global shift to re-reference the spectrum - 2nd iteration: re-search for nearest maxima - set newly detected position to peak if found better fits - re-set the spectrum referencing to original (if not requested as argument) :param peaks: list of peaks to snap :param maximumLimit: float to use as + left-right limits from peak position where to search new maxima :param useAdjacientPeaksAsLimits: bool. use adj peak position as left-right limits. don't search maxima after adjacent peaks :param doNeg: snap also negative peaks :param figOfMeritLimit: float. don't snap peaks with FOM below limit threshold :param spectrum: the spectum obj. optional if all peaks belong to the same spectrum :return: """ if not peaks: getLogger().warning('Cannot snap peaks. No peaks found') return [] if not spectrum: spectrum = peaks[0].peakList.spectrum # - reorder the peaks by heights to give higher peaks priority to the snap peaks.sort(key=lambda x: x.height, reverse=True) # reorder peaks by height oPositions, oHeights = [x.position for x in peaks], [x.height for x in peaks] nPositions, nHeights = [], [] # - 1st iteration: search for nearest maxima and calculate deltas for peak in peaks: if peak is not None: position, height = _get1DClosestExtremum(peak, maximumLimit=maximumLimit, useAdjacientPeaksAsLimits=useAdjacientPeaksAsLimits, doNeg=doNeg, figOfMeritLimit=figOfMeritLimit) nPositions.append(position) nHeights.append(height) deltas = np.array(nPositions) - np.array(oPositions) deltas = deltas.flatten() if len(peaks) == 1: peaks[0].position = nPositions[0] peaks[0].height = nHeights[0] return deltas[0] # - use deltas to fit patterns of shifts and detect the most probable global shift stats, edges, binNumbers, fittedCurve, mostCommonBinNumber, highestValues, fittedCurveExtremum = _getBins(deltas) shift = max(highestValues) oReferenceValues = spectrum.referenceValues oPositions = spectrum.positions # - use the global shift to re-reference the spectrum spectrum.referenceValues = [spectrum.referenceValues[0] - shift] spectrum.positions = spectrum.positions - shift # - 2nd iteration: re-search for nearest maxima for peak in peaks: if peak is not None: oPosition = peak.position peak.position = [peak.position[0] + shift,] # - use the shift to re-reference the peak to the moved spectrum position, height = _get1DClosestExtremum(peak, maximumLimit=maximumLimit, useAdjacientPeaksAsLimits=useAdjacientPeaksAsLimits, doNeg=doNeg, figOfMeritLimit=figOfMeritLimit) # - set newly detected position if found better fits if peak.position == position: # Same position detected. Revert peak.height = peak.peakList.spectrum.getHeight(oPosition) peak.position = oPosition else: peak.position = position peak.height = height # - re-set the spectrum referencing to original (if not requested as argument) if not autoRereferenceSpectrum: spectrum.referenceValues = oReferenceValues spectrum.positions = oPositions # check for missed maxima or peaks snapped to height@position but had other unpicked maxima close-by return shift
def _add(x, y): if y > 0: return _add(x, y - 1) + 1 elif y < 0: return _add(x, y + 1) - 1 else: return x def _sub(x, y): if y > 0: return _sub(x, y - 1) - 1 elif y < 0: return _sub(x, y + 1) + 1 else: return x def _getAdjacentPeakPositions1D(peak): positions = [p.position[0] for p in peak.peakList.peaks] positions.sort() queryPos = peak.position[0] tot = len(positions) idx = positions.index(queryPos) previous, next = True, True previousPeakPosition, nextPeakPosition = None, None if idx == 0: previous = False if idx == tot - 1: next = False if previous: previousPeakPosition = positions[positions.index(queryPos) - 1] if next: nextPeakPosition = positions[positions.index(queryPos) + 1] return previousPeakPosition, nextPeakPosition def _get1DClosestExtremum(peak, maximumLimit=0.1, useAdjacientPeaksAsLimits=False, doNeg=False, figOfMeritLimit=1): """ :param peak: :param maximumLimit: don't snap peaks over this threshold in ppm :param useAdjacientPeaksAsLimits: stop a peak to go over pre-existing peaks (to the left/right) :param doNeg: include negative peaks as solutions :param figOfMeritLimit: skip if below this threshold and give only height at position :return: position, height : position is a list of length 1, height is a float search maxima close to a given peak based on the maximumLimit (left/right) or using the adjacent peaks position as limits. return the nearest coordinates position, height """ from ccpn.core.lib.SpectrumLib import estimateNoiseLevel1D from ccpn.core.lib.PeakPickers.PeakPicker1D import _find1DMaxima spectrum = peak.peakList.spectrum x = spectrum.positions y = spectrum.intensities position, height = peak.position, peak.height if peak.figureOfMerit < figOfMeritLimit: height = peak.peakList.spectrum.getHeight(peak.position) return position, height if useAdjacientPeaksAsLimits: # a left # b right limit a, b = _getAdjacentPeakPositions1D(peak) if not a: # could not find adjacient peaks if the snapping peak is the first or last if peak.position[0] > 0: #it's positive a = peak.position[0] - maximumLimit else: a = peak.position[0] + maximumLimit if not b: if peak.position[0] > 0: # it's positive b = peak.position[0] + maximumLimit else: b = peak.position[0] - maximumLimit else: a, b = peak.position[0] - maximumLimit, peak.position[0] + maximumLimit # refind maxima noiseLevel = spectrum.noiseLevel negativeNoiseLevel = spectrum.negativeNoiseLevel if not noiseLevel: # estimate as you can from the spectrum spectrum.noiseLevel, spectrum.negativeNoiseLevel = noiseLevel, negativeNoiseLevel = estimateNoiseLevel1D(y) if not negativeNoiseLevel: noiseLevel, negativeNoiseLevel = estimateNoiseLevel1D(y) spectrum.negativeNoiseLevel = negativeNoiseLevel x_filtered, y_filtered = _1DregionsFromLimits(x, y, [a, b]) maxValues, minValues = _find1DMaxima(y_filtered, x_filtered, positiveThreshold=noiseLevel, negativeThreshold=negativeNoiseLevel, findNegative=doNeg) allValues = maxValues + minValues if len(allValues) > 0: allValues = np.array(allValues) positions = allValues[:, 0] heights = allValues[:, 1] nearestPosition = find_nearest(positions, peak.position[0]) nearestHeight = heights[positions == nearestPosition] if useAdjacientPeaksAsLimits: if a == nearestPosition or b == nearestPosition: # avoid snapping to an existing peak, as it might be a wrong snap. height = peak.peakList.spectrum.getHeight(peak.position) # elif abs(nearestPosition) > abs(peak.position[0] + maximumLimit): # avoid snapping on the noise if not maximum found # peak.height = peak.peakList.spectrum.getHeight(peak.position) else: position = [float(nearestPosition), ] height = nearestHeight else: a, b = _getAdjacentPeakPositions1D(peak) if float(nearestPosition) in (a,b): # avoid snapping to an existing peak, height = peak.peakList.spectrum.getHeight(peak.position) else: position = [float(nearestPosition), ] height = nearestHeight else: height = peak.peakList.spectrum.getHeight(peak.position) return position, float(height) def _snap1DPeakToClosestExtremum(peak, maximumLimit=0.1, doNeg=False, figOfMeritLimit=1): """ It snaps a peak to its closest extremum, that can be considered as a peak. it uses adjacent peak positions as boundaries. However if no adjacent peaks then uses the maximumlimits. Uses peak :param peak: obj peak :param maximumLimit: maximum tolerance left or right from the peak position (ppm) """ position, height = _get1DClosestExtremum(peak, maximumLimit, doNeg=doNeg, figOfMeritLimit=figOfMeritLimit) with undoBlockWithoutSideBar(): with notificationEchoBlocking(): peak.position = position peak.height = height
[docs]def getSpectralPeakHeights(spectra, peakListIndexes: list = None) -> pd.DataFrame: return _getSpectralPeakPropertyAsDataFrame(spectra, peakProperty=HEIGHT, peakListIndexes=peakListIndexes)
[docs]def getSpectralPeakVolumes(spectra, peakListIndexes: list = None) -> pd.DataFrame: return _getSpectralPeakPropertyAsDataFrame(spectra, peakProperty=VOLUME, peakListIndexes=peakListIndexes)
[docs]def getSpectralPeakHeightForNmrResidue(spectra, peakListIndexes: list = None) -> pd.DataFrame: """ return: Pandas DataFrame with the following structure: Index: ID for the nmrResidue(s) assigned to the peak ; Columns => Spectrum series values sorted by ascending values, if series values are not set, then the spectrum name is used instead. | SP1 | SP2 | SP3 NR_ID | | | ------------+-----------+-----------+---------- A.1.ARG | 10 | 100 | 1000 """ df = getSpectralPeakHeights(spectra, peakListIndexes) newDf = df[df[NR_ID] != ''] # remove rows if NR_ID is not defined newDf = newDf.reset_index(drop=True).groupby(NR_ID).max() return newDf
[docs]def getSpectralPeakVolumeForNmrResidue(spectra, peakListIndexes:list=None) -> pd.DataFrame: """ return: Pandas DataFrame with the following structure: Index: ID for the nmrResidue(s) assigned to the peak ; Columns => Spectrum series values sorted by ascending values, if series values are not set, then the spectrum name is used instead. | SP1 | SP2 | SP3 NR_ID | | | ------------+-----------+-----------+---------- A.1.ARG | 10 | 100 | 1000 """ df = getSpectralPeakVolumes(spectra, peakListIndexes) newDf = df[df[NR_ID] != ''] # remove rows if NR_ID is not defined newDf = newDf.reset_index(drop=True).groupby(NR_ID).max() return newDf
def _getSpectralPeakPropertyAsDataFrame(spectra, peakProperty=HEIGHT, NR_ID=NR_ID, peakListIndexes:list=None): """ :param spectra: list of spectra :param peakProperty: 'height'or'volume' :param NR_ID: columnName for the NmrResidue ID :param peakListIndex: list of peakList indexes for getting the right peakList from the given spectra, default: the last peakList available :return: Pandas DataFrame with the following structure: Index: multiIndex => axisCodes as levels; Columns => NR_ID: the nmrResidue(s) assigned for the peak if available Spectrum series values sorted by ascending values, if series values are not set, then the spectrum name is used instead. | NR_ID | SP1 | SP2 | SP3 H N | | | | -------------+-------- +-----------+-----------+--------- 7.5 104.3 | A.1.ARG | 10 | 100 | 1000 to sort the dataframe by an axisCode, eg 'H' use: df = df.sort_index(level='H') """ dfs = [] if peakListIndexes is None: peakListIndexes = [-1] * len(spectra) for spectrum, ix in zip(spectra, peakListIndexes): positions = [] values = [] nmrResidues = [] serieValue = spectrum.name # use spectrumName as default. if series defined use that instead. if len(spectrum.spectrumGroups) > 0: sGserieValue = spectrum._getSeriesItem(spectrum.spectrumGroups[-1]) if sGserieValue is not None: serieValue = sGserieValue peaks = spectrum.peakLists[ix].peaks peaks.sort(key=lambda x: x.position, reverse=True) for peak in peaks: positions.append(peak.position) values.append(getattr(peak, peakProperty, None)) assignedResidues = list(set(filter(None, map(lambda x: x.nmrResidue.id, makeIterableList(peak.assignments))))) nmrResidues.append(", ".join(assignedResidues)) _df = pd.DataFrame(values, columns=[serieValue], index=m_ix.from_tuples(positions, names=spectrum.axisCodes)) _df[NR_ID] = nmrResidues _df = _df[~_df.index.duplicated()] dfs.append(_df) df = pd.concat(dfs, axis=1, levels=0) df[NR_ID] = df.T[df.columns.values == NR_ID].apply(lambda x: ' '.join(set([item for item in x[x.notnull()]]))) df = df.loc[:, ~df.columns.duplicated()] cols = list(df.columns) resColumn = cols.pop(cols.index(NR_ID)) sortedCols = sorted(cols, reverse=False) sortedCols.insert(0, resColumn) return df[sortedCols] def _getPeakSNRatio(peak, factor=2.5): """ Estimate the Signal to Noise ratio based on the spectrum Positive and Negative noise values. If Noise thresholds are not defined in the spectrum, then they are estimated as well. If only the positive noise threshold is defined, the negative noise threshold will be the inverse of the positive. SNratio = |factor*(height/|NoiseMax-NoiseMin|)| height is the peak height NoiseMax is the spectrum positive noise threshold NoiseMin is the spectrum negative noise threshold :param factor: float, multiplication factor. :return: float, SignalToNoise Ratio value for the peak """ spectrum = peak._parent.spectrum from ccpn.core.lib.SpectrumLib import estimateNoiseLevel1D from ccpn.core.lib.SpectrumLib import estimateSNR noiseLevel, negativeNoiseLevel = spectrum.noiseLevel, spectrum.negativeNoiseLevel if negativeNoiseLevel is None and noiseLevel is not None: negativeNoiseLevel = - noiseLevel if noiseLevel > 0 else noiseLevel * 2 spectrum.negativeNoiseLevel = negativeNoiseLevel getLogger().warning('Spectrum Negative noise not defined for %s. Estimated default' % spectrum.pid) if noiseLevel is None: # estimate it if spectrum.dimensionCount == 1: noiseLevel, negativeNoiseLevel = estimateNoiseLevel1D(spectrum.intensities) spectrum.noiseLevel, spectrum.negativeNoiseLevel = noiseLevel, negativeNoiseLevel getLogger().warning('Spectrum noise level(s) not defined for %s. Estimated default' % spectrum.pid) else: noiseLevel = spectrum.estimateNoise() negativeNoiseLevel = -noiseLevel spectrum.noiseLevel, spectrum.negativeNoiseLevel = noiseLevel, negativeNoiseLevel getLogger().warning('Spectrum noise level(s) not defined for %s. Estimated default' % spectrum.pid) if peak.height is None: updateHeight(peak) _getPeakSNRatio(peak) snr = estimateSNR(noiseLevels=[noiseLevel, negativeNoiseLevel], signalPoints=[peak.height], factor=factor) return snr[0] ## estimateSNR return a list with a lenght always > 0