"""Functions for calculating axisCodes for NmrExpPrototypes, adn necessary utilities.
For normal cases, use only refExpFimRefCodeMap function, and get axisCodes from
refExpDimRefs and the map.
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (http://www.ccpn.ac.uk) 2014 - 2017"
__credits__ = ("Wayne Boucher, Ed Brooksbank, Rasmus H Fogh, Luca Mureddu, Timothy J Ragan & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license",
"or ccpnmodel.ccpncore.memops.Credits.CcpnLicense for licence text")
__reference__ = ("For publications, please use reference from http://www.ccpn.ac.uk/v3-software/downloads/license",
"or ccpnmodel.ccpncore.memops.Credits.CcpNmrReference")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: CCPN $"
__dateModified__ = "$dateModified: 2017-07-07 16:33:14 +0100 (Fri, July 07, 2017) $"
__version__ = "$Revision: 3.0.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2017-04-07 10:28:48 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================
import operator
import collections
from typing import Dict
from ccpn.util import Constants
# Dictionary of experiment names that should come first in the list
# - with optional remapping of names
priorityNameRemapping = {
# C,H experiments
'13C HSQC/HMQC':'13C HSQC/HMQC',
# C,H,H experiments
'13C NOESY-HSQC':'13C NOESY-HSQC',
'13C HSQC-NOESY':'13C HSQC-NOESY',
'HcCH-TOCSY':'HcCH-TOCSY',
'HCcH-TOCSY':'HCcH-TOCSY',
'HcCH-COSY':'HcCH-COSY',
'HCcH-COSY':'HCcH-COSY',
# H,N experiments
'15N HSQC/HMQC':'15N HSQC/HMQC',
# H,H,N experiments
'15N HSQC-TOCSY':'15N HSQC-TOCSY',
'15N TOCSY-HSQC':'15N TOCSY-HSQC',
'15N NOESY-HSQC':'15N NOESY-HSQC',
'15N HSQC-NOESY':'15N HSQC-NOESY',
'HBcb/HAcacoNH':'HB/HAcoNH',
# C,H,N experiments
'HNCA/CB':'HNCA/CB',
'hCca-TOCSY-coNH':'hCcacoNH-TOCSY',
'Hcca-TOCSY-coNH':'HccacoNH-TOCSY',
'hCc-TOCSY-NH':'hCcaNH-TOCSY',
'Hcc-TOCSY-NH':'HccaNH-TOCSY',
'HNcoCA/CB':'HNcoCA/CB',
'HNCA':'HNCA',
'HNcaCO':'HNcaCO',
'HNcoCA':'HNcoCA',
'HNCO':'HNCO',
'hbCB/haCAcoNH':'CB/CAcoNH',
}
[docs]def resetAllAxisCodes(nmrProject):
"""Reset all axisCodes (ExpDimRef.name) in project to be unique, match the isotope,
and match the standard Prototype where a prototype is known"""
stdCodeMap = refExpDimRefCodeMap(nmrProject.root)
for experiment in nmrProject.sortedExperiments():
if experiment.refExperiment is None:
# No prototype - just use nucleus as axis code
foundCodes = {}
for expDim in experiment.expDims:
for expDimRef in expDim.expDimRefs:
isotopeCodes = expDimRef.isotopeCodes
# Get axis code
if expDimRef.measurementType.lower() == 'shift' and len(isotopeCodes) == 1:
# Normal shift axis, use nucleus to set
code = isotope2Nucleus(isotopeCodes[0])
else:
# Different type of axis.
# For simplicity set to axis_1, axis_2, ... for now
# FIXME: no print!
print ("WARNING, non-std axis - axisCode set to 'axis'")
code = 'axis'
foundCodes[code] = 0
# add index for duplicates
indx = foundCodes.get(code)
if indx is None:
foundCodes[code] = 0
else:
indx += 1
foundCodes[code] = indx
code = '%s%s' % (code, str(indx))
# Set the attribute
expDimRef.name = code
else:
# We have a prototype - use standard axisCode map
for expDim in experiment.expDims:
for expDimRef in expDim.expDimRefs:
expDimRef.name = stdCodeMap[expDimRef.refExpDimRef]
[docs]def refExpDimRefCodeMap(project):
"""get RefExpDimRef: axisCode map for all NmrExpPrototypes in project"""
result = {}
for nxp in project.sortedNmrExpPrototypes():
for isReversed in False, True:
refExperiments = nxp.findAllRefExperiments(isReversed=isReversed)
if refExperiments:
measurementMap = _measurementCodeMap(nxp, forReversed=isReversed)
for re in refExperiments:
for red in re.refExpDims:
for redr in red.refExpDimRefs:
result[redr] = measurementMap[redr.expMeasurement]
#
return result
def _measurementCodeMap(nmrExpPrototype, forReversed=False):
"""get expMeasurement:axisCode map"""
result = {}
measurements = _orderedMeasurements(nmrExpPrototype, forReversed=forReversed)
foundCodes = {}
# get axisCodes per expMeasurement
for measurement in measurements:
code = rawAxisCode(measurement)
indx = foundCodes.get(code)
if indx is None:
foundCodes[code] = 0
else:
indx += 1
foundCodes[code] = indx
code = '%s%s' % (code, indx)
result[measurement] = code
#
return result
def _connectedShiftMeasurements(expMeasurement):
"""find expMeasurements that are of type Shift, match a HX bond, and onebonded to the input measurement"""
result = []
nmrExpPrototype = expMeasurement.nmrExpPrototype
if expMeasurement.measurementType.lower() != 'shift':
return result
atomSites = expMeasurement.atomSites
if len(atomSites) != 1:
# FIXME: no print!
print ("WARNING%s Shift must have single AtomSite, has: %s"
% (expMeasurement.nmrExpPrototype.name, [x.name for x in atomSites]))
if not atomSites:
return result
expSite = atomSites[0]
expIsotope = expSite.isotopeCode
sites = []
if expIsotope in ('1H', '19F'):
for et in expSite.findAllExpTransfers(transferType='onebond'):
for site in et.atomSites:
if site is not expSite:
if site.isotopeCode in ('13C', '15N') and site.maxNumber != 0:
sites.append(site)
break
elif expIsotope in ('13C', '15N'):
for et in expSite.findAllExpTransfers(transferType='onebond'):
for site in et.atomSites:
if site is not expSite:
if site.isotopeCode in ('1H', '19F') and site.maxNumber != 0:
sites.append(site)
break
result = [x for x in nmrExpPrototype.expMeasurements if x.measurementType.lower() == 'shift'
and x.findFirstAtomSite() in sites]
#
return result
def _orderedMeasurements(nmrExpPrototype, forReversed=False):
"""get ExpMeasurements in order: acquisition first, furthest from acquisition last,
connected measurements grouped together. If forReversed, get with acquisition last
(for reversed experiments"""
ll = [tt[1].expMeasurement
for tt in sorted((x.stepNumber,x)
for x in nmrExpPrototype.findFirstExpGraph().expSteps)]
if not forReversed:
ll.reverse()
measurements = []
for measurement in ll:
if measurement not in measurements:
measurements.append(measurement)
for me in _connectedShiftMeasurements(measurement):
if me not in measurements:
measurements.append(me)
# we do not search all expGraphs, so just add missing measurements to the end
# this is a heuristic, not a rigid ordering
for measurement in nmrExpPrototype.expMeasurements:
if measurement not in measurements:
measurements.append(measurement)
for me in _connectedShiftMeasurements(measurement):
if me not in measurements:
measurements.append(me)
#
return measurements
[docs]def rawAxisCode(expMeasurement):
"""Get raw expMeasurement axisCode (without number suffixes) from NmrExpPrototype.ExpMeasurement"""
em = expMeasurement
emType = em.measurementType.lower()
tag = Constants.measurementType2ElementCode.get(emType, emType)
if tag == 'delay':
result = tag
elif tag == 'shift':
result = ''.join(sorted(atomSiteAxisCode(x) for x in em.atomSites))
else:
result = tag + ''.join(sorted(isotope2Nucleus(x.isotopeCode).lower() for x in em.atomSites))
#
return result
# def nucleusCode(expMeasurement):
# """Get nucleusCode from NmrExpPrototype.ExpMeasurement"""
# from ccpn.util import Constants
#
# em = expMeasurement
# emType = em.measurementType.lower()
#
# tag = Constants.measurementType2ElementCode.get(emType, emType)
# if tag == 'delay':
# result = tag
# else:
# records = [Constants.isotopeRecords.get(x.isotopeCode.upper() for x in em.atomSites)]
# if None in records:
# raise ValueError("Invalid NmrExpPrototype, unknown isotopeCode for atomSite at %s "
# % expMeasurement)
# else:
# result = ''.join(sorted(x.symbol for x in records))
# if tag != 'shift':
# result = tag + result.lower()
# #
# return result
[docs]def isotope2Nucleus(isotopeCode):
"""remove integer prefix from integer+string string"""
from ccpn.util.isotopes import isotopeRecords
record = isotopeRecords.get(isotopeCode)
if record is None:
raise ValueError("Isotope %s not recognised")
else:
return record.symbol.upper()
[docs]def atomSiteAxisCode(atomSite):
"""Get axisCode (without number suffixes) from NmrExPrototype.AtomSite"""
name = atomSite.name
result = nucleus = isotope2Nucleus(atomSite.isotopeCode)
if name == 'CA':
result = 'CA'
elif name == 'CO':
result = 'CO'
elif nucleus in 'HF':
# H or F, check if bound to C or N
ll = []
for et in atomSite.findAllExpTransfers(transferType='onebond'):
for site in et.atomSites:
if site is not atomSite:
if site.isotopeCode == '13C':
ss = 'c'
elif site.isotopeCode == '15N':
ss = 'n'
else:
break
if site.maxNumber == 0:
ll.append('-'+ss)
elif site.minNumber > 0:
ll.append(ss)
break
# In rare cases we get both - this is nonsense, so remove them
for tags in ('n', '-n'),('c','-c'):
if tags[0] in ll and tags[1] in ll:
for tag in tags:
while tag in ll:
ll.remove(tag)
# Now remove the '-x' tags - they were there to catch the above problem
for tag in ('-n', '-c'):
while tag in ll:
ll.remove(tag)
result += ''.join(sorted(set(ll)))
elif nucleus in 'CN':
ll = []
for et in atomSite.findAllExpTransfers(transferType='onebond'):
for site in et.atomSites:
if site is not atomSite:
if site.isotopeCode == '1H':
ss = 'h'
elif site.isotopeCode == '19F':
ss = 'f'
else:
break
if site.maxNumber == 0:
ll.append('-'+ss)
elif site.minNumber > 0:
ll.append(ss)
break
# In rare cases we get both - this is nonsense, so remove them
for tags in ('h', '-h'),('f','-f'):
if tags[0] in ll and tags[1] in ll:
for tag in tags:
while tag in ll:
ll.remove(tag)
# Now remove the '-x' tags - they were there to catch the above problem
for tag in ('-h', '-f'):
while tag in ll:
ll.remove(tag)
result += ''.join(sorted(set(ll)))
#
return result
[docs]def testExpPrototypes(resetCodes=False):
"""Test functions and make diagnostic output"""
from ccpnmodel.ccpncore.lib.Io import Api as apiIo
project = apiIo.newProject("ExpPrototypeTest", overwriteExisting=True)
codeMap = refExpDimRefCodeMap(project)
axisCodeSet = set()
useNames = {}
synonyms= []
for nmrExpPrototype in project.sortedNmrExpPrototypes():
# for nmrExpPrototype in project.findAllNmrExpPrototypes(serial=292):
# print("nmrExpPrototype: %s %s " % (nmrExpPrototype, nmrExpPrototype._ID))
for refExperiment in nmrExpPrototype.sortedRefExperiments():
# print("refExperiment: %s %s " % (refExperiment, refExperiment._ID))
# check axis codes
codes = []
for red in refExperiment.sortedRefExpDims():
# print("red: %s %s" % (red, red._ID))
for redr in red.sortedRefExpDimRefs():
# print("redr: %s %s " % ( redr, redr._ID))
code = codeMap[redr]
codes.append(code)
axisCodeSet.add(code)
if len(codes) == len(set(codes)):
if resetCodes:
# Codes are unique - set them
for red in refExperiment.sortedRefExpDims():
for redr in red.sortedRefExpDimRefs():
redr.axisCode = codeMap[redr]
else:
# FIXME: no print!
print ("Duplicate code in %s: %s. SKIPPING" % (refExperiment.name, codes))
# print ("TEST %s %s" % (refExperiment.name, codes))
# check for duplicate synonyms and collect all synonyms
key = refExperiment.name
usename = refExperiment.synonym or key
ll = useNames.get(usename, [])
ll.append(key)
useNames[usename] = ll
if usename != key:
synonyms.append((usename, key))
# FIXME: no print!
print ("All axisCodes: %s" % list(sorted(axisCodeSet)))
for key, val in sorted(useNames.items()):
if len(val) > 1:
# FIXME: no print!
print ("Duplicate name: %s %s" % (key, val))
# for tt in sorted(synonyms):
# print ("SYNONYM: %s %s" % tt)
if resetCodes:
project.saveModified()
[docs]def experimentSynonymSummary():
"""1) set of atomSiteNames appearing
2) List of dimensionCount, synonym, name, atomNames for refExperiments sorted by name """
from ccpnmodel.ccpncore.lib.Io import Api as apiIo
project = apiIo.newProject("ExpPrototypeTest", overwriteExisting=True)
# NBNB
# 1) Some refExperiments have no synonym - use the name instead for these
result= []
# atomSiteNames = set()
allAxisCodes = set()
# Sort refExperiments by name
refExperiments = list(sorted([x for y in project.nmrExpPrototypes for x in y.refExperiments],
key=operator.attrgetter('name')))
for refExperiment in refExperiments:
# print("refExperiment: %s %s " % (refExperiment, refExperiment._ID))
typ = 'NORM'
# siteNames = []
axisCodes = []
synonym = refExperiment.synonym
for red in refExperiment.sortedRefExpDims():
ll = [x.axisCode for x in red.sortedRefExpDimRefs()]
if len(ll) == 1:
axisCodes.extend(ll)
allAxisCodes.add(ll[0])
else:
axisCodes.append(str(ll))
typ = 'CPLX'
data = [typ, len(refExperiment.refExpDims), synonym or refExperiment.name,
refExperiment.name, axisCodes]
result.append(data)
# ss = ','.join(tuple(x.name for x in redr.expMeasurement.atomSites))
# siteNames.append(ss)
# atomSiteNames.add(ss)
#
return allAxisCodes, result
[docs]def fetchIsotopeRefExperimentMap(project:'MemopsRoot') -> Dict:
"""fetch {tuple(sortedNucleusCodes):RefExperiment} dictionary for project
The key is a tuple of element names ('C, Br, H, D, T, ...) or either of J, MQ, ALT, or delay
NB, each list value is sorted ad-hoc to bring the most common experiments to the top.
Do NOT sort or reorder the result"""
result = {}
if hasattr(project, '_isotopeRefExperimentMap'):
result = project._isotopeRefExperimentMap
if not result:
tuples = []
for expPrototype in project.sortedNmrExpPrototypes():
expGraph = expPrototype.sortedExpGraphs()[0]
expSteps = sorted(expGraph.expSteps, key=operator.attrgetter('stepNumber'))
atomSiteCount = len(expPrototype.atomSites)
for refExperiment in expPrototype.sortedRefExperiments():
# ExpSteps in order of traversal
expMeasurements = list(x.expMeasurement for x in expSteps)
if refExperiment.isReversed:
expMeasurements.reverse()
# We now have expMeasurements in traversal order, from start to acquisition
# Sort key for simple (e.g. shift)
# versus compound (e.g. coupling, MQ, or reduced-dimensionality) axes
axisCodes = refExperiment.axisCodes
multiAxisCodeSort = 100 if None in axisCodes else max(x.count(',') for x in axisCodes)
# Sort key for start and acquisition dimensions
ss = expMeasurements[-1].atomSites[0].isotopeCode
for acqNucleusCode in ss:
if not acqNucleusCode.isdigit():
break
if acqNucleusCode == 'H':
ll = list(expMeasurements[0].atomSites)
startsOnProton = (ll[0].isotopeCode == '1H' if len(ll) == 1 else False)
if startsOnProton:
# Proton start and end -= sort first
nucleusAcquisitionSort = -30
else:
# Proton end only - sort third
nucleusAcquisitionSort = -10
elif acqNucleusCode == 'C':
# Carbon detection - sort second
nucleusAcquisitionSort = -20
else:
# Other. Sort last, grouping by acquisition nucleus
nucleusAcquisitionSort = ord(acqNucleusCode)
refExperimentName = refExperiment.name
# Ad-hoc - give absolute priority for preferred experiments
if (refExperiment.synonym or refExperimentName) in priorityNameRemapping:
multiAxisCodeSort = -100
# Ad hoc modifications - to get more common experiments first:
downgrade = ('(', 'base','coupling', '[n')
adhoc = any(x in refExperimentName for x in downgrade)
if '{C' in refExperimentName:
# Give higher priority to CA|Cca experiments
atomSiteCountMod = atomSiteCount -1
if 'co' in refExperimentName.lower():
# Move CB/CA CO experiments yet one more level up
atomSiteCountMod -= 0.5
else:
atomSiteCountMod = atomSiteCount
# Put result on tuples list, with sorting keys in position
key = tuple(sorted(refExperiment.nucleusCodes))
tt = (key, multiAxisCodeSort, nucleusAcquisitionSort, adhoc, atomSiteCountMod,
len(refExperimentName),refExperiment)
tuples.append(tt)
# Sort tuples by arranged sort codes
tuples.sort()
for tt in tuples:
# Get actual result out and in dict
refExperiment = tt[-1]
key = tt[0]
ll = result.get(key, [])
ll.append(refExperiment)
result[key] = ll
# print(key, refExperiment.name, refExperiment.synonym)
project._isotopeRefExperimentMap = result
#
return result
# ExperimentClassification namedtuple, showing which groups a given RefExperiment falls into
ExperimentClassification = collections.namedtuple('ExperimentCharacteristic',
('dimensionCount', 'acquisitionNucleus',
'isThroughSpace','isRelayed',
'isRelaxation', 'isQuantification', 'isJResolved',
'isMultipleQuantum', 'isProjection',
'name', 'synonym'))
[docs]def getExpClassificationDict(nmrProject) -> dict:
"""
Get a dictionary of dictionaries of dimensionCount:sortedNuclei:ExperimentClassification named tuples.
"""
classificationDict = {}
for nmrExpProtoType in nmrProject.root.sortedNmrExpPrototypes():
for refExperiment in nmrExpProtoType.sortedRefExperiments():
dimensionCount = len(refExperiment.refExpDims)
if dimensionCount not in classificationDict.keys():
classificationDict[dimensionCount] = {}
nucleusCodes = tuple(sorted(refExperiment.nucleusCodes))
if nucleusCodes not in classificationDict[dimensionCount].keys():
classificationDict[dimensionCount][nucleusCodes] = []
classification = getExperimentClassification(refExperiment)
classificationDict[dimensionCount][nucleusCodes].append(classification)
return classificationDict
[docs]def getExperimentClassification(refExperiment:'RefExperiment') -> ExperimentClassification:
"""Get ExperimentClassification namedtuple, showing which groups a given RefExperiment falls into
The field names should be self-explanatory, except for 'isQuantification' - this covers
experiments that are classified as 'quantification' in the NmrExpPrototype description, and are
*not* relaxation measurements. In practice these are J-measurement experiments (for now)."""
nmrExpPrototype = refExperiment.nmrExpPrototype
transferTypes = set(y.transferType for x in nmrExpPrototype.expGraphs for y in x.expTransfers)
measurementTypes = set(x.measurementType for x in nmrExpPrototype.expMeasurements)
dimensionCount = len(refExperiment.refExpDims)
expMeasurement = _orderedMeasurements(nmrExpPrototype, forReversed=refExperiment.isReversed)[0]
acquisitionNucleus = expMeasurement.atomSites[0].isotopeCode
isThroughSpace = 'through-space' in transferTypes
isRelayed = ('relayed' in transferTypes or 'relayed-alternate' in transferTypes)
isRelaxation = (nmrExpPrototype.category == 'quantification' and any(x for x in measurementTypes
if x.startswith('T')))
isQuantification = (nmrExpPrototype.category == 'quantification' and not isRelaxation)
isJResolved = 'JCoupling' in measurementTypes
isMultipleQuantum = 'MQShift' in measurementTypes
isProjection = ('.2D.' in refExperiment.name or '.3D.' in refExperiment.name)
name = refExperiment.name
synonym = refExperiment.synonym or name
# NEW - bug fix:
synonym = priorityNameRemapping.get(synonym, synonym)
result = ExperimentClassification(dimensionCount, acquisitionNucleus, isThroughSpace, isRelayed,
isRelaxation, isQuantification, isJResolved, isMultipleQuantum,
isProjection, name, synonym)
#
return result
[docs]def testExperimentFilter(project):
allData = {}
counters = {}
for nxp in project.sortedNmrExpPrototypes():
for rx in nxp.sortedRefExperiments():
filterData = getExperimentClassification(rx)
allData[rx.name] = filterData
fields = list(allData.values())[0]._fields
byColumns = list(zip(*allData.values()))
for ii,field in enumerate(fields):
counters[field] = collections.Counter(byColumns[ii])
#
return counters
if __name__ == '__main__':
pass
counter = collections.Counter()
from ccpnmodel.ccpncore.lib.Io.Api import newProject
project = newProject("ExpPrototypeTest", overwriteExisting=True)
for cc in project.sortedChemComps():
counter['ChemComp'] += 1
for ns in cc.namingSystems:
counter[ns.name] += 1
if False: #cc.className == 'StdChemComp':
for ccv in cc.sortedChemCompVars():
if ccv.isDefaultVar:
# print ('@~@~', cc.molType, cc.code3Letter, cc.code1Letter, ccv.linking, ccv.descriptor)
ll = sorted(x.name for x in ccv.chemAtoms if x.className == 'ChemAtom' and x.name.startswith('H'))
print ('\t'.join((cc.molType, cc.code3Letter, str(cc.code1Letter), str(ccv.isDefaultVar),
ccv.linking, ccv.descriptor, str(ll))))
# for item in sorted(testExperimentFilter(project).items()):
# pass
# # print ('\n%s:\n%s' % item)
# counter = collections.Counter()
# for nxp in project.nmrExpPrototypes:
# for xx in nxp.atomSites:
# counter[(xx.name, xx.isotopeCode)] += 1
# for item in sorted(counter.items()):
# print ('@~@~', item)
dd = {}
ala = project.findFirstChemComp(code3Letter='ALA')
for ns in ala.namingSystems:
dd[ns.name] = ll = []
for asn in ns.atomSysNames:
name = asn.atomName
sysName = asn.sysName
if name != sysName:
ll.append((name, sysName))
for key, ll in sorted(dd.items()):
print ('-->', key, ll)