#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (http://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
"Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
"CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
"J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Luca Mureddu $"
__dateModified__ = "$dateModified: 2022-01-14 19:42:25 +0000 (Fri, January 14, 2022) $"
__version__ = "$Revision: 3.0.4 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: Luca Mureddu $"
__date__ = "$Date: 2017-05-28 10:28:42 +0000 (Sun, May 28, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================
#### GUI IMPORTS
import ccpn.AnalysisScreen.lib.experimentAnalysis.matching.MatchingDataFrames as mdf
from ccpn.AnalysisScreen.gui.widgets import HitFinderWidgets as hw
from ccpn.ui.gui.widgets.PipelineWidgets import GuiPipe
from ccpn.ui.gui.widgets.Label import Label
from ccpn.ui.gui.widgets.PulldownList import PulldownList
from ccpn.ui.gui.widgets.LineEdit import LineEdit
from ccpn.ui.gui.widgets.CheckBox import CheckBox
#### NON GUI IMPORTS
import datetime
from tqdm import tqdm
from pprint import pformat
from ccpn.util.Logging import getLogger
from ccpn.framework.lib.pipeline.PipeBase import SpectraPipe, PIPE_SCREEN
from ccpn.AnalysisScreen.lib.experimentAnalysis.matching.MatchingAlgorithms import NearestMatch,\
MatchingAlgorithmsNames, MatchingAlgorithmsDict
import ccpn.AnalysisScreen.lib.experimentAnalysis.matching.HitAnalysisLib as hLib
import ccpn.AnalysisScreen.lib.experimentAnalysis.matching.MatchingVariables as mv
from ccpn.AnalysisScreen.lib.experimentAnalysis.Common import _getReferencesFromSample
from ccpn.util.Common import makeIterableList, _getObjectsByPids, _getPidsFromObjects
from ccpn.core.Spectrum import Spectrum
########################################################################################################################
### Attributes:
### Used in setting the dictionary keys on _kwargs either in GuiPipe and Pipe
########################################################################################################################
PipeName = 'Setup Screening dataset'
ReferenceSpectrumGroup = 'Reference_SpectrumGroup'
ControlSpectrumGroup = 'Control_SpectrumGroup'
DisplacerSpectrumGroup = 'Displacer_SpectrumGroup'
TargetSpectrumGroup = 'Target_SpectrumGroup'
SGVarNames = [ReferenceSpectrumGroup, ControlSpectrumGroup, TargetSpectrumGroup, DisplacerSpectrumGroup]
PeakListIndice = 'PeakList_Indice'
MatchingLabel = 'Matching_Engine'
RunName = 'Run_Name'
DefaultMatching = NearestMatch.name
DefaultPeakListIndice = -1
PeakProperty = 'Property'
Height = 'Height'
LW = 'LineWidths (experimental)'
Volume = 'Volume (experimental)'
PeakProperties = [Height, LW, Volume]
DefaultPeakProperty = Height
AvailableExpTypes = mv.MatchTrainingSet4Exp
## Widget variables and/or _kwargs keys
ReferenceSpectrumGroup = 'Reference_SpectrumGroup'
Control_SpectrumGroup = 'Control_SpectrumGroup'
Target_SpectrumGroup = 'Target_SpectrumGroup'
RefsFromSU = 'Use_Substance_referenceSpectra'
TrainingClassifierExpType = 'Experiment_Type' #used for grabbing the training dataset for scoring the matches
InitialMatchingTolerance = 'Initial_matching_tolerance(ppm)'
MatchingStep = 'Increasing_matching_step(ppm)'
FinalMatchingTolerance = 'Final_matching_tolerance(ppm)'
ReferencePeakListIndex = 'Reference_PeakList'
TargetPeakListIndex = 'Target_PeakList'
DefaultRunName = 'RunName' #this will be over written with a timestamp as default name
DefaultTrainingClassifierExpType = mv.F
IncludeUnmatchedSubstances = 'Include_Unmatched_Substances'
## defaults
DefaultReferencePeakListIndex = -1
DefaultTargetPeakListIndex = -1
DefaultInitialTolerance = 0.01
DefaultMatchingStep = 0.01
DefaultFinalTolerance = 1
DefaultMatchTargetOnlyOnce=False
DefaultRefsFromSU=False
DefaultMatchingEngine = 'ClosestChemicalShift'
DefaultIncludeUnmatchedSubstances = False
########################################################################################################################
########################################## ALGORITHM ########################################################
########################################################################################################################
## see more at AnalysisScreen.lib.experimentAnalysis.matching.MatchesObjects
def _createMatchDF(referencesSpectrumGroup, controlSpectrumGroup, targetSpectrumGroup, displacerSpectrumGroup=None,
matchingEngine=NearestMatch, useSUreferenceSpectra=False, trainingClassifierType=mv.F,
addUnmatchedReferences=True, addUnmatchedToReference=True, excludeBelowFigureOfMerit=0,
**kwargs):
"""
:param referencesSpectrumGroup:
:param controlSpectrumGroup:
:param targetSpectrumGroup:
:param displacerSpectrumGroup:
:param matchingEngine:
:param useSUreferenceSpectra:
:param trainingClassifierType:
:param addUnmatchedReferences: Add any reference even if it has not been matched to a Control-Target spectral peak
:param addUnmatchedToReference: Add any Control-Target pair even if it has not been matched to a reference spectral peak
:param excludeBelowFigureOfMerit don't match peaks if their figure of merit is below this threshold
:param kwargs:
:return:
"""
peakMatches1D = mdf.PeakMatches1D()
matchingClassifier = hLib._getMatchingScoreClassifier(trainingClassifierType=trainingClassifierType)
getLogger().info('Matching spectral peaks')
if useSUreferenceSpectra: ## gets references spectra from the link sample-component-substance-referenceSpectra
referencesSpectra = [None]*len(controlSpectrumGroup.spectra)
else:
if not referencesSpectrumGroup:
getLogger().error('References SpectrumGroup not given')
referencesSpectra = [None] * len(controlSpectrumGroup.spectra)
else:
referencesSpectra = referencesSpectrumGroup.spectra
if not displacerSpectrumGroup: ## gets references spectra from the link sample-component-substance-referenceSpectra
displacerSpectra = [None]*len(controlSpectrumGroup.spectra)
else:
displacerSpectra = displacerSpectrumGroup.spectra
for referenceSpectrum, controlSpectrum, targetSpectrum, displacerSpectrum in tqdm(zip(
referencesSpectra,
controlSpectrumGroup.spectra,
targetSpectrumGroup.spectra,
displacerSpectra),
total=len(targetSpectrumGroup.spectra)):
controlPeakList = controlSpectrum.peakLists[DefaultPeakListIndice]
targetPeakList = targetSpectrum.peakLists[DefaultPeakListIndice]
if not len(controlPeakList.peaks) == len(targetPeakList.peaks):
# Deal with a mismatch, although can lead to ambiguity.
getLogger().error('%s: Peak count mismatched between peakLists: %s,%s. '
'Make sure the two peakLists have the same number of comparable peaks. Skipped.'
% (PipeName, controlPeakList.pid, targetPeakList.pid))
continue
if useSUreferenceSpectra:
references = _getReferencesFromSample(targetSpectrum)
else:
references = [referenceSpectrum]
if len(references) == 0:
getLogger().warning('No references found for %s. Nothing to match' % str(targetSpectrum.pid))
referencePeaks = [peak for reference in references for peak in reference.peakLists[DefaultPeakListIndice].peaks]
targetPeaks = targetSpectrum.peakLists[DefaultPeakListIndice].peaks
controlPeaks = controlSpectrum.peakLists[DefaultPeakListIndice].peaks
if displacerSpectrum:
displacerPeaks = displacerSpectrum.peakLists[DefaultPeakListIndice].peaks
else:
displacerPeaks = [None]*len(controlPeaks)
## do the actual match and fill dataframe
groupPeaks = [referencePeaks, controlPeaks, targetPeaks, displacerPeaks]
matchedReferencePeaks, matchedTargetPeaks = hLib._getMatchedReferenceAndTargetPeaks(peakMatches1D,
*groupPeaks,
matchingClassifier,
matchingEngine=matchingEngine,
excludeBelowFigureOfMerit=excludeBelowFigureOfMerit)
## add unmatched References peaks on dataset but only if Figure of Merit is > 0
if addUnmatchedReferences:
unmatchedReferencePeaks = [p for p in referencePeaks if p not in matchedReferencePeaks]
for refPeak in unmatchedReferencePeaks:
pdRow = peakMatches1D.getRowTemplate()
# if refPeak.figureOfMerit >0:
hLib._fillCommonPeakProperies(pdRow, mv.Reference_, refPeak)
peakMatches1D.loc[hLib._getUniqueID()] = pdRow # add row to df
## add unmatched Target-Control peaks on dataset but only if Figure of Merit is > 0
if addUnmatchedToReference:
unmatchedTargetPeaks = []
unmatchedControlPeaks = []
for controlPeak, targetPeak in zip(controlPeaks, targetPeaks):
if targetPeak not in (list(set(matchedTargetPeaks))):
# if (controlPeak.figureOfMerit > 0 and targetPeak.figureOfMerit > 0):
unmatchedTargetPeaks.append(targetPeak)
unmatchedControlPeaks.append(controlPeak)
hLib._fillPartiallyPeakMatches1D(peakMatches1D, controlPeak, targetPeak)
# now check and add missing substances in the dataset. For example because the reference spectra did not have peaks.
if useSUreferenceSpectra:
getLogger().info('Searching for unmatched reference spectra/substances')
hLib._addUnmatchedSubstancesToDataFrame(peakMatches1D, controlSpectrumGroup.spectra,
targetSpectrumGroup.spectra, displacerSpectra )
peakMatches1D.__class__ = mdf.PeakMatches1D
peakMatches1D._applyID()
return peakMatches1D
########################################################################################################################
########################################## GUI PIPE #############################################################
########################################################################################################################
[docs]class SetupScreeningDatasetGuiPipe(GuiPipe):
pipeName = PipeName
def __init__(self, name=pipeName, parent=None, project=None, **kw):
super(SetupScreeningDatasetGuiPipe, self)
GuiPipe.__init__(self, parent=parent, name=name, project=project, **kw)
self.parent = parent
date = datetime.datetime.now().strftime("%y-%m-%d-%H-%M")
runName = self.parent.pipelineName + '_' + date
row = 0
Label(self.pipeFrame, RunName, grid=(row, 0))
setattr(self, RunName, LineEdit(self.pipeFrame, text=runName, grid=(row, 1)))
row += 1
hw._addSGpulldowns(self, row, SGVarNames)
row += len(SGVarNames)
Label(self.pipeFrame, MatchingLabel, grid=(row, 0))
setattr(self, MatchingLabel, PulldownList(self.pipeFrame, texts=MatchingAlgorithmsNames, grid=(row, 1)))
row += 1
Label(self.pipeFrame, TrainingClassifierExpType, grid=(row, 0))
setattr(self, TrainingClassifierExpType, PulldownList(self.pipeFrame, texts=AvailableExpTypes, grid=(row, 1)))
row += 1
Label(self.pipeFrame, RefsFromSU, grid=(row, 0))
setattr(self, RefsFromSU, CheckBox(self.pipeFrame, checked=DefaultRefsFromSU, grid=(row, 1)))
row += 1
Label(self.pipeFrame, IncludeUnmatchedSubstances, grid=(row, 0))
setattr(self, IncludeUnmatchedSubstances, CheckBox(self.pipeFrame, checked=DefaultIncludeUnmatchedSubstances, grid=(row, 1)))
self._updateWidgets()
def _updateWidgets(self):
self._setSpectrumGroupPullDowns(SGVarNames, headerText='None', headerEnabled=True, )
########################################################################################################################
########################################## PIPE #############################################################
########################################################################################################################
[docs]class SetupScreeningDatasetPipe(SpectraPipe):
"""
Apply phasing to all the spectra in the pipeline
"""
guiPipe = SetupScreeningDatasetGuiPipe
pipeName = PipeName
pipeCategory = PIPE_SCREEN
_kwargs = {
ReferenceSpectrumGroup: 'ReferenceSpectrumGroup.pid',
ControlSpectrumGroup: 'ControlSpectrumGroup.pid', # this will be replaced by the SG pid in the gui
TargetSpectrumGroup: 'TargetSpectrumGroup.pid',
DisplacerSpectrumGroup: 'DisplacerSpectrumGroup.pid',
MatchingLabel: DefaultMatchingEngine,
RefsFromSU: DefaultRefsFromSU,
RunName: DefaultRunName,
TrainingClassifierExpType: DefaultTrainingClassifierExpType,
IncludeUnmatchedSubstances: DefaultIncludeUnmatchedSubstances,
# TargetPeakListIndex:DefaultTargetPeakListIndex,
# ReferencePeakListIndex:DefaultReferencePeakListIndex
}
[docs] def runPipe(self, spectra):
"""
:param spectra: inputData
:return: spectra
# tbd: in theory this pipe should return a df or dataset
"""
getLogger().info(pformat(self._kwargs, compact=True))
referencesSpectrumGroup = self._getSpectrumGroup(self._kwargs[ReferenceSpectrumGroup])
controlSpectrumGroup = self._getSpectrumGroup(self._kwargs[ControlSpectrumGroup])
targetSpectrumGroup = self._getSpectrumGroup(self._kwargs[TargetSpectrumGroup])
displacerSpectrumGroup = self._getSpectrumGroup(self._kwargs[DisplacerSpectrumGroup])
groups = [referencesSpectrumGroup, controlSpectrumGroup, targetSpectrumGroup, displacerSpectrumGroup]
matchingEngineName = self._kwargs[MatchingLabel]
refsFromSU = self._kwargs[RefsFromSU]
matchingEngine = MatchingAlgorithmsDict.get(matchingEngineName)
runName = self._kwargs[RunName]
trainingClassifierType = self._kwargs[TrainingClassifierExpType]
includeUnmatchedSubstances = self._kwargs[IncludeUnmatchedSubstances]
if runName is None or runName == '':
runName = self.pipeline.pipelineName + '_' + datetime.datetime.now().strftime("%y-%m-%d-%H-%M")
if self.project is not None:
if None not in [controlSpectrumGroup, targetSpectrumGroup]:
peakMatches1D = _createMatchDF(*groups, matchingEngine=matchingEngine,
useSUreferenceSpectra=refsFromSU,
trainingClassifierType=trainingClassifierType,
addUnmatchedReferences=includeUnmatchedSubstances,
addUnmatchedToReference=includeUnmatchedSubstances)
sgDict = hLib._getSGDict(*groups)
dataTable = hLib._newDTfromDF(peakMatches1D, self.project, runName, sgDict=sgDict)
dataTable.setMetadata(mv.RunningPipelineSettings, self._kwargs)
return spectra
else:
getLogger().warning('Impossible to run Screening Matching without the Control or Target SpectrumGroup')
SetupScreeningDatasetPipe.register() # Registers the pipe in the pipeline