Source code for ccpn.pipes.AlignSpectra

#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (http://www.ccpn.ac.uk) 2014 - 2021"
__credits__ = ("Ed Brooksbank, Luca Mureddu, Timothy J Ragan & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2021-02-04 12:07:32 +0000 (Thu, February 04, 2021) $"
__version__ = "$Revision: 3.0.3 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: Luca Mureddu $"
__date__ = "$Date: 2017-05-28 10:28:42 +0000 (Sun, May 28, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================


#### GUI IMPORTS
from ccpn.ui.gui.widgets.PipelineWidgets import GuiPipe, _getWidgetByAtt
from ccpn.ui.gui.widgets.PulldownList import PulldownList
from ccpn.ui.gui.widgets.Label import Label
from ccpn.ui.gui.widgets.GLLinearRegionsPlot import GLTargetButtonSpinBoxes

#### NON GUI IMPORTS
from ccpn.framework.lib.pipeline.PipeBase import SpectraPipe, PIPE_POSTPROCESSING
from scipy import signal
import numpy as np
from scipy import stats
from ccpn.util.Logging import getLogger
from collections import OrderedDict


########################################################################################################################
###   Attributes:
###   Used in setting the dictionary keys on _kwargs either in GuiPipe and Pipe
########################################################################################################################

ReferenceSpectrum = 'Reference_Spectrum'
PipeName = 'Align Spectra'
HeaderText = '-- Select Spectrum --'
ReferenceRegion = 'Reference_Region'
DefaultReferenceRegion = (0.5, -0.5)
EnginesVar = 'Engines'
IndividualMode = 'individual'
Median = 'median'
Mode = 'mode'
Mean = 'mean'
Engines = [IndividualMode, Mean, Mode, Median]
EnginesCallables = OrderedDict([
    ('median', np.median),
    ('mode', stats.mode),
    ('mean', np.mean),
    ])

DefaultEngine = 'median'
NotAvailable = 'Not Available'


########################################################################################################################
##########################################      ALGORITHM       ########################################################
########################################################################################################################


def _getShift(ref_x, ref_y, target_y):
    """
    :param ref_x: X array of the reference spectra (positions)
    :param ref_y: Y array of the reference spectra (intensities)
    :param target_y: Y array of the target spectra (intensities)
    :return: the shift needed to align the two spectra.
    Global alignment. This can give unpredictable results if the signal intensities are very different
    To align the target spectrum to its reference: add the shift to the x array.
    E.g. target_y += shift
    """
    return (np.argmax(signal.correlate(ref_y, target_y)) - len(target_y)) * np.mean(np.diff(ref_x))


def _getShiftForSpectra(referenceSpectrum, spectra, referenceRegion=(3, 2), engine='median'):
    """
    :param referenceSpectrum:
    :param spectra:
    :param referenceRegion: ppm regions
    :param intensityFactor:
    :param engine: one of 'median', 'mode', 'mean'
    :return: shift float
    alignment of spectra. It aligns based on a specified region of the reference spectrum
    """

    shifts = []
    point1, point2 = np.max(referenceRegion), np.min(referenceRegion)
    xRef, yRef = referenceSpectrum.positions, referenceSpectrum.intensities
    ref_x_filtered = np.where((xRef <= point1) & (xRef >= point2))  #only the region of interest for the reference spectrum

    ref_y_filtered = yRef[ref_x_filtered]
    maxYRef = np.max(ref_y_filtered)  #Find the highest signal in the region of interest for the reference spectrum. All spectra will be aligned to this point.
    boolsRefMax = yRef == maxYRef
    refIndices = np.argwhere(boolsRefMax)
    if len(refIndices) > 0:
        refPos = float(xRef[refIndices[0]])
    else:
        return [0] * len(spectra) if engine == IndividualMode else 0  # cannot align without a reference intensity signal

    #  find the shift for each spectrum for the selected region
    for sp in spectra:
        xTarg, yTarg = sp.positions, sp.intensities
        x_TargetFilter = np.where((xTarg <= point1) & (xTarg >= point2))  # filter only the region of interest for the target spectrum

        y_TargetValues = yTarg[x_TargetFilter]
        maxYTarget = np.max(y_TargetValues)
        boolsMax = yTarg == maxYTarget  #Find the highest signal in the region of interest
        indices = np.argwhere(boolsMax)
        if len(indices) > 0:
            tarPos = float(xTarg[indices[0]])
            shift = tarPos - refPos
            shifts.append(shift)

    if len(shifts) == len(spectra):
        if engine == IndividualMode:
            return shifts

    # get a common shift from all the shifts found
    if engine in EnginesCallables.keys():
        shift = EnginesCallables[engine](shifts)
        if isinstance(shift, stats.stats.ModeResult):
            shift = shift.mode[0]
            return float(shift)


[docs]def addIndividualShiftToSpectra(spectra, shifts, shiftPeaks=True): alignedSpectra = [] for sp, shift in zip(spectra, shifts): sp.positions -= shift if shiftPeaks: _shiftPeaks(sp, shift) alignedSpectra.append(sp) return alignedSpectra
[docs]def addShiftToSpectra(spectra, shift, shiftPeaks=True): alignedSpectra = [] for sp in spectra: sp.positions -= shift if shiftPeaks: _shiftPeaks(sp, shift) alignedSpectra.append(sp) return alignedSpectra
def _shiftPeaks(spectrum, shift): if spectrum.dimensionCount == 1: for peakList in spectrum.peakLists: for peak in peakList.peaks: peak.position = [peak.position[0]-shift,] ######################################################################################################################## ########################################## GUI PIPE ############################################################# ########################################################################################################################
[docs]class AlignSpectraGuiPipe(GuiPipe): pipeName = PipeName def __init__(self, name=pipeName, parent=None, project=None, **kwds): super(AlignSpectraGuiPipe, self) GuiPipe.__init__(self, parent=parent, name=name, project=project, **kwds) self._parent = parent row = 0 # Reference Spectrum self.spectrumLabel = Label(self.pipeFrame, ReferenceSpectrum, grid=(row, 0)) setattr(self, ReferenceSpectrum, PulldownList(self.pipeFrame, headerText=HeaderText, headerIcon=self._warningIcon, callback=self._estimateShift, grid=(row, 1))) row += 1 # target region self.tregionLabel = Label(self.pipeFrame, text=ReferenceRegion, grid=(row, 0)) setattr(self, ReferenceRegion, GLTargetButtonSpinBoxes(self.pipeFrame, application=self.application, values=DefaultReferenceRegion, orientation='v', decimals=4, step=0.001, grid=(row, 1))) row += 1 # Engines self.enginesLabel = Label(self.pipeFrame, EnginesVar, grid=(row, 0)) setattr(self, EnginesVar, PulldownList(self.pipeFrame, texts=Engines, grid=(row, 1))) row += 1 estimateShiftLabel = Label(self.pipeFrame, 'Estimated_shift', grid=(row, 0)) self.estimateShift = Label(self.pipeFrame, NotAvailable, grid=(row, 1)) row += 1 self._updateWidgets() def _estimateShift(self, *args): """Only to show on the Gui pipe """ referenceRegion = getattr(self, ReferenceRegion).get() engine = getattr(self, EnginesVar).getText() referenceSpectrum = getattr(self, ReferenceSpectrum).get() if not isinstance(referenceSpectrum, str): spectra = [sp for sp in self._parent.inputData if sp != referenceSpectrum] if engine == IndividualMode: self.estimateShift.clear() self.estimateShift.setText(str(NotAvailable)) else: shift = _getShiftForSpectra(referenceSpectrum, spectra, referenceRegion=referenceRegion, engine=engine) self.estimateShift.clear() self.estimateShift.setText(str(shift)) def _updateWidgets(self): self._setDataReferenceSpectrum() def _setDataReferenceSpectrum(self): data = list(self._parent.inputData) if len(data) > 0: _getWidgetByAtt(self, ReferenceSpectrum).setData(texts=[sp.pid for sp in data], objects=data, index=1, headerText=HeaderText, headerIcon=self._warningIcon) else: _getWidgetByAtt(self, ReferenceSpectrum)._clear() def _closePipe(self): 'remove the lines from plotwidget if any' _getWidgetByAtt(self, ReferenceRegion)._turnOffPositionPicking() self.closePipe()
######################################################################################################################## ########################################## PIPE ############################################################# ########################################################################################################################
[docs]class AlignSpectra(SpectraPipe): guiPipe = AlignSpectraGuiPipe pipeName = PipeName pipeCategory = PIPE_POSTPROCESSING _kwargs = { ReferenceSpectrum: 'spectrum.pid', ReferenceRegion : DefaultReferenceRegion, EnginesVar : DefaultEngine }
[docs] def runPipe(self, spectra): """ :param spectra: inputData :return: aligned spectra """ referenceRegion = self._kwargs[ReferenceRegion] engine = self._kwargs[EnginesVar] if self.project is not None: referenceSpectrumPid = self._kwargs[ReferenceSpectrum] referenceSpectrum = self.project.getByPid(referenceSpectrumPid) if referenceSpectrum is not None: spectraToAlign = [spectrum for spectrum in spectra if spectrum != referenceSpectrum] if spectraToAlign: if engine == IndividualMode: shifts = _getShiftForSpectra(referenceSpectrum, spectraToAlign, referenceRegion=referenceRegion, engine=engine) addIndividualShiftToSpectra(spectraToAlign, shifts) getLogger().info('Alignment: applied individual shift to all spectra') else: shift = _getShiftForSpectra(referenceSpectrum, spectraToAlign, referenceRegion=referenceRegion, engine=engine) addShiftToSpectra(spectraToAlign, shift) getLogger().info('Alignment: applied shift to all spectra of %s' % shift) return spectra else: getLogger().warning('Spectra not Aligned. Returned original spectra') return spectra else: getLogger().warning('Spectra not Aligned. Returned original spectra') return spectra
AlignSpectra.register() # Registers the pipe in the pipeline