#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (http://www.ccpn.ac.uk) 2014 - 2021"
__credits__ = ("Ed Brooksbank, Luca Mureddu, Timothy J Ragan & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
"CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
"J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2021-02-04 12:07:32 +0000 (Thu, February 04, 2021) $"
__version__ = "$Revision: 3.0.3 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: Luca Mureddu $"
__date__ = "$Date: 2017-05-28 10:28:42 +0000 (Sun, May 28, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================
#### GUI IMPORTS
from ccpn.ui.gui.widgets.PipelineWidgets import GuiPipe, _getWidgetByAtt
from ccpn.ui.gui.widgets.PulldownList import PulldownList
from ccpn.ui.gui.widgets.Label import Label
from ccpn.ui.gui.widgets.GLLinearRegionsPlot import GLTargetButtonSpinBoxes
#### NON GUI IMPORTS
from ccpn.framework.lib.pipeline.PipeBase import SpectraPipe, PIPE_POSTPROCESSING
from scipy import signal
import numpy as np
from scipy import stats
from ccpn.util.Logging import getLogger
from collections import OrderedDict
########################################################################################################################
### Attributes:
### Used in setting the dictionary keys on _kwargs either in GuiPipe and Pipe
########################################################################################################################
ReferenceSpectrum = 'Reference_Spectrum'
PipeName = 'Align Spectra'
HeaderText = '-- Select Spectrum --'
ReferenceRegion = 'Reference_Region'
DefaultReferenceRegion = (0.5, -0.5)
EnginesVar = 'Engines'
IndividualMode = 'individual'
Median = 'median'
Mode = 'mode'
Mean = 'mean'
Engines = [IndividualMode, Mean, Mode, Median]
EnginesCallables = OrderedDict([
('median', np.median),
('mode', stats.mode),
('mean', np.mean),
])
DefaultEngine = 'median'
NotAvailable = 'Not Available'
########################################################################################################################
########################################## ALGORITHM ########################################################
########################################################################################################################
def _getShift(ref_x, ref_y, target_y):
"""
:param ref_x: X array of the reference spectra (positions)
:param ref_y: Y array of the reference spectra (intensities)
:param target_y: Y array of the target spectra (intensities)
:return: the shift needed to align the two spectra.
Global alignment. This can give unpredictable results if the signal intensities are very different
To align the target spectrum to its reference: add the shift to the x array.
E.g. target_y += shift
"""
return (np.argmax(signal.correlate(ref_y, target_y)) - len(target_y)) * np.mean(np.diff(ref_x))
def _getShiftForSpectra(referenceSpectrum, spectra, referenceRegion=(3, 2), engine='median'):
"""
:param referenceSpectrum:
:param spectra:
:param referenceRegion: ppm regions
:param intensityFactor:
:param engine: one of 'median', 'mode', 'mean'
:return: shift float
alignment of spectra. It aligns based on a specified region of the reference spectrum
"""
shifts = []
point1, point2 = np.max(referenceRegion), np.min(referenceRegion)
xRef, yRef = referenceSpectrum.positions, referenceSpectrum.intensities
ref_x_filtered = np.where((xRef <= point1) & (xRef >= point2)) #only the region of interest for the reference spectrum
ref_y_filtered = yRef[ref_x_filtered]
maxYRef = np.max(ref_y_filtered) #Find the highest signal in the region of interest for the reference spectrum. All spectra will be aligned to this point.
boolsRefMax = yRef == maxYRef
refIndices = np.argwhere(boolsRefMax)
if len(refIndices) > 0:
refPos = float(xRef[refIndices[0]])
else:
return [0] * len(spectra) if engine == IndividualMode else 0 # cannot align without a reference intensity signal
# find the shift for each spectrum for the selected region
for sp in spectra:
xTarg, yTarg = sp.positions, sp.intensities
x_TargetFilter = np.where((xTarg <= point1) & (xTarg >= point2)) # filter only the region of interest for the target spectrum
y_TargetValues = yTarg[x_TargetFilter]
maxYTarget = np.max(y_TargetValues)
boolsMax = yTarg == maxYTarget #Find the highest signal in the region of interest
indices = np.argwhere(boolsMax)
if len(indices) > 0:
tarPos = float(xTarg[indices[0]])
shift = tarPos - refPos
shifts.append(shift)
if len(shifts) == len(spectra):
if engine == IndividualMode:
return shifts
# get a common shift from all the shifts found
if engine in EnginesCallables.keys():
shift = EnginesCallables[engine](shifts)
if isinstance(shift, stats.stats.ModeResult):
shift = shift.mode[0]
return float(shift)
[docs]def addIndividualShiftToSpectra(spectra, shifts, shiftPeaks=True):
alignedSpectra = []
for sp, shift in zip(spectra, shifts):
sp.positions -= shift
if shiftPeaks:
_shiftPeaks(sp, shift)
alignedSpectra.append(sp)
return alignedSpectra
[docs]def addShiftToSpectra(spectra, shift, shiftPeaks=True):
alignedSpectra = []
for sp in spectra:
sp.positions -= shift
if shiftPeaks:
_shiftPeaks(sp, shift)
alignedSpectra.append(sp)
return alignedSpectra
def _shiftPeaks(spectrum, shift):
if spectrum.dimensionCount == 1:
for peakList in spectrum.peakLists:
for peak in peakList.peaks:
peak.position = [peak.position[0]-shift,]
########################################################################################################################
########################################## GUI PIPE #############################################################
########################################################################################################################
[docs]class AlignSpectraGuiPipe(GuiPipe):
pipeName = PipeName
def __init__(self, name=pipeName, parent=None, project=None, **kwds):
super(AlignSpectraGuiPipe, self)
GuiPipe.__init__(self, parent=parent, name=name, project=project, **kwds)
self._parent = parent
row = 0
# Reference Spectrum
self.spectrumLabel = Label(self.pipeFrame, ReferenceSpectrum, grid=(row, 0))
setattr(self, ReferenceSpectrum, PulldownList(self.pipeFrame, headerText=HeaderText,
headerIcon=self._warningIcon, callback=self._estimateShift, grid=(row, 1)))
row += 1
# target region
self.tregionLabel = Label(self.pipeFrame, text=ReferenceRegion, grid=(row, 0))
setattr(self, ReferenceRegion, GLTargetButtonSpinBoxes(self.pipeFrame, application=self.application,
values=DefaultReferenceRegion, orientation='v',
decimals=4,
step=0.001,
grid=(row, 1)))
row += 1
# Engines
self.enginesLabel = Label(self.pipeFrame, EnginesVar, grid=(row, 0))
setattr(self, EnginesVar, PulldownList(self.pipeFrame, texts=Engines, grid=(row, 1)))
row += 1
estimateShiftLabel = Label(self.pipeFrame, 'Estimated_shift', grid=(row, 0))
self.estimateShift = Label(self.pipeFrame, NotAvailable, grid=(row, 1))
row += 1
self._updateWidgets()
def _estimateShift(self, *args):
"""Only to show on the Gui pipe """
referenceRegion = getattr(self, ReferenceRegion).get()
engine = getattr(self, EnginesVar).getText()
referenceSpectrum = getattr(self, ReferenceSpectrum).get()
if not isinstance(referenceSpectrum, str):
spectra = [sp for sp in self._parent.inputData if sp != referenceSpectrum]
if engine == IndividualMode:
self.estimateShift.clear()
self.estimateShift.setText(str(NotAvailable))
else:
shift = _getShiftForSpectra(referenceSpectrum, spectra,
referenceRegion=referenceRegion, engine=engine)
self.estimateShift.clear()
self.estimateShift.setText(str(shift))
def _updateWidgets(self):
self._setDataReferenceSpectrum()
def _setDataReferenceSpectrum(self):
data = list(self._parent.inputData)
if len(data) > 0:
_getWidgetByAtt(self, ReferenceSpectrum).setData(texts=[sp.pid for sp in data], objects=data, index=1,
headerText=HeaderText, headerIcon=self._warningIcon)
else:
_getWidgetByAtt(self, ReferenceSpectrum)._clear()
def _closePipe(self):
'remove the lines from plotwidget if any'
_getWidgetByAtt(self, ReferenceRegion)._turnOffPositionPicking()
self.closePipe()
########################################################################################################################
########################################## PIPE #############################################################
########################################################################################################################
[docs]class AlignSpectra(SpectraPipe):
guiPipe = AlignSpectraGuiPipe
pipeName = PipeName
pipeCategory = PIPE_POSTPROCESSING
_kwargs = {
ReferenceSpectrum: 'spectrum.pid',
ReferenceRegion : DefaultReferenceRegion,
EnginesVar : DefaultEngine
}
[docs] def runPipe(self, spectra):
"""
:param spectra: inputData
:return: aligned spectra
"""
referenceRegion = self._kwargs[ReferenceRegion]
engine = self._kwargs[EnginesVar]
if self.project is not None:
referenceSpectrumPid = self._kwargs[ReferenceSpectrum]
referenceSpectrum = self.project.getByPid(referenceSpectrumPid)
if referenceSpectrum is not None:
spectraToAlign = [spectrum for spectrum in spectra if spectrum != referenceSpectrum]
if spectraToAlign:
if engine == IndividualMode:
shifts = _getShiftForSpectra(referenceSpectrum, spectraToAlign,
referenceRegion=referenceRegion, engine=engine)
addIndividualShiftToSpectra(spectraToAlign, shifts)
getLogger().info('Alignment: applied individual shift to all spectra')
else:
shift = _getShiftForSpectra(referenceSpectrum, spectraToAlign,
referenceRegion=referenceRegion, engine=engine)
addShiftToSpectra(spectraToAlign, shift)
getLogger().info('Alignment: applied shift to all spectra of %s' % shift)
return spectra
else:
getLogger().warning('Spectra not Aligned. Returned original spectra')
return spectra
else:
getLogger().warning('Spectra not Aligned. Returned original spectra')
return spectra
AlignSpectra.register() # Registers the pipe in the pipeline