"""
Module Documentation here
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (http://www.ccpn.ac.uk) 2014 - 2021"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
"Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
"CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
"J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2021-08-04 12:28:19 +0100 (Wed, August 04, 2021) $"
__version__ = "$Revision: 3.0.4 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2018-12-20 15:44:35 +0000 (Thu, December 20, 2018) $"
#=========================================================================================
# Start of code
#=========================================================================================
import os
import time
import re
import collections
import pandas as pd
import itertools
from ccpn.util.isotopes import DEFAULT_ISOTOPE_DICT
try:
# Python 3
from itertools import zip_longest
except:
# python 2.7
from itertools import izip_longest as zip_longest
# from datetime import datetime
from collections import OrderedDict
from ccpn.core.Project import Project
from ccpn.util.Path import aPath
from ccpn.util.Logging import getLogger
from ccpn.util.nef.GenericStarParser import NamedOrderedDict
sparkyReadingOrder = []
# possibly for later
sparkyWritingOrder = ([x for x in sparkyReadingOrder if x.startswith('sparky_')] +
[x for x in sparkyReadingOrder if not x.startswith('sparky_')])
_isALoop = ()
# not sure how to use this yet
sparky2CcpnMap = {}
# STAR parsing REGEX, following International Tables for Crystallography volume G section 2.1
_SPARKY_REGEX = r"""(?xmi) # $Revision$ # No 'u' flag for perl 5.8.8/RHEL5 compatibility
^;([\S\s]*?)(?:\r\n|\s)^;(?:(?=\s)|$) # 1 Multi-line string
|(?:^|(?<=\s))(\#.*?)\r?$ # 2 Comment
|(?:^|(?<=\s))(?:(global_) # 3 something?
|<sparky\s(.*?)> # 4 Sparky project start
|(\$\S+) # 5 STAR save frame reference
|<end\s(.*?)> # 6 block terminator
|<(version\s.*?)> # 7 block start
|<(.*?)> # 8 block header - shouldn't need the leading whitespace
|type\s(.*?) # 9 type header
|([\[]\S*) # 10 type subheader
|([\]]\S*) # 11 type subblock terminator
|((?:global_\S+)|(?:stop_\S+)|(?:data_)|(?:loop_\S+)) # 12 Invalid privileged construct
|(_\S+) # 13 Data name
|'(.*?)' # 14 Single-quoted string
|"(.*?)" # 15 Double-quoted string
|(\.) # 16 CIF null
|(\?) # 17 CIF unknown/missing
|((?:[^'";_$\s]|(?!^);).*)(?<![ ]) # 18 Non-quoted string - exclude pre/post whitespace
|([\[\]]\S*) # 19 Square bracketed constructs (reserved)
|(\S+) # 20 Catch-all bad token
)
(?:(?=\s)|$)"""
# Compiled form of _REGEX
_sparky_pattern = re.compile(_SPARKY_REGEX, re.UNICODE)
# Token types. NB numbers must be synced to regex - these are used directly!!!
SP_TOKEN_MULTILINE = 1
SP_TOKEN_COMMENT = 2
SP_TOKEN_GLOBAL = 3
SP_TOKEN_SPARKY_PROJECT = 4
SP_TOKEN_SAVE_FRAME_REF = 5
SP_TOKEN_END_SPARKY_DICT = 6
SP_TOKEN_VERSION = 7
SP_TOKEN_SPARKY_DICT = 8
SP_TOKEN_TYPE_DICT = 9
SP_TOKEN_TYPE_NESTED = 10
SP_TOKEN_END_NESTED = 11
SP_TOKEN_BAD_CONSTRUCT = 12
SP_TOKEN_DATA_NAME = 13
SP_TOKEN_SQUOTE_STRING = 14
SP_TOKEN_DQUOTE_STRING = 15
SP_TOKEN_NULL = 16
SP_TOKEN_UNKNOWN = 17
SP_TOKEN_STRING = 18
SP_TOKEN_SQUARE_BRACKET = 19
SP_TOKEN_BAD_TOKEN = 20
SparkyToken = collections.namedtuple('SparkyToken', ('type', 'value'))
PEAK_TYPE = 'type'
PEAK_PEAK = 'peak'
PEAK_MAXSEARCH = 10
PEAK_POS = 'pos'
PEAK_RESONANCE = 'rs'
PEAK_HEIGHT = 'height'
PEAK_LINEWIDTH = 'linewidth'
PEAK_POSNUM = 1
PEAK_RESONANCENUM = 2
PEAK_HEIGHTNUM = 4
PEAK_LINEWIDTHNUM = 8
PEAK_ALLFOUND = PEAK_POSNUM | PEAK_RESONANCENUM | PEAK_HEIGHTNUM | PEAK_LINEWIDTHNUM
[docs]class UnquotedValue(str):
"""A plain string - the only difference is the type: 'UnquotedValue'.
Used to distinguish values from STAR files that were not quoted.
STAR special values (like null, unknown, ...) are only recognised if unquoted strings"""
pass
# Constants for I/O of standard values
NULLSTRING = UnquotedValue('.')
UNKNOWNSTRING = UnquotedValue('?')
TRUESTRING = UnquotedValue('true')
FALSESTRING = UnquotedValue('false')
NANSTRING = UnquotedValue('NaN')
PLUSINFINITYSTRING = UnquotedValue('Infinity')
MINUSINFINITYSTRING = UnquotedValue('-Infinity')
SPARKY_ROOT = 'root'
SPARKY_DATA = 'data'
SPARKY_VERSION = 'version'
SPARKY_ENDDICT = '<end'
SPARKY_PATHNAME = 'pathname'
SPARKY_SPECTRUM = 'spectrum'
SPARKY_NAME = 'name'
SPARKY_ATTACHEDDATA = 'attached data'
SPARKY_ORNAMENT = 'ornament'
SPARKY_SPARKY = 'sparky'
SPARKY_SAVEFILES = 'savefiles'
SPARKY_PROJECT = 'project file'
SPARKY_SAVE = 'save file'
SPARKY_MOLECULE = 'molecule'
SPARKY_RESONANCES = 'resonances'
SPARKY_DEFAULTCHAIN = '@-'
SPARKY_SEQUENCE = 'sequence'
SPARKY_FIRSTRESIDUENUM = 'first_residue_number'
SPARKY_CONDITION = 'condition'
SPARKY_NESTED = 'local'
SPARKY_PEAK = 'peak'
SPARKY_MODIFIEDNAME = 'SparkyModifiedName'
SPARKY_ORIGINALNAME = 'SparkyOriginalName'
SPARKY_HASHNAME = 'SparkyHashName'
[docs]def getSparkyTokenIterator(text):
"""Iterator that returns an iterator over all STAR tokens in a generic STAR file"""
return (SparkyToken(x.lastindex, x.group(x.lastindex))
for x in _sparky_pattern.finditer(text))
[docs]class SparkySyntaxError(ValueError):
pass
# class NamedOrderedDict(OrderedDict):
# def __init__(self, name=None):
# super(NamedOrderedDict, self).__init__()
# self.name = name
#
# def __str__(self):
# return '%s(name=%s)' % (self.__class__.__name__, self.name)
#
# def __repr__(self):
# return '%s(%s, name=%s)' % (self.__class__.__name__, list(tt for tt in self.items()), self.name)
#
# def addItem(self, tag, value):
# if tag in self:
# raise ValueError("%s: duplicate key name %s" % (self, tag))
# else:
# self[tag] = value
[docs]class SparkyDict(NamedOrderedDict):
"""Top level container for general STAR object tree"""
def __init__(self, name=SPARKY_ROOT):
super(SparkyDict, self).__init__(name=name)
[docs] def getParameter(self, name=SPARKY_NAME, firstOnly=False):
dataBlocks = [self[db] for db in self.keys() if name in db]
if dataBlocks:
if firstOnly:
return dataBlocks[0]
else:
return dataBlocks
else:
return None
[docs] def getData(self, name=SPARKY_DATA, firstOnly=False):
dataBlocks = [self[db] for db in self.keys() if name in db]
dataBlocks = [ll for x in dataBlocks for ll in x] # concaternate data lists
if dataBlocks:
if firstOnly:
return dataBlocks[0]
else:
return dataBlocks
else:
return None
[docs] def getDataValues(self, value, firstOnly=False):
dataBlocks = [self[db] for db in self.keys() if SPARKY_DATA in db]
spList = []
for spType in dataBlocks:
if spType:
spType = [re.findall(r'%s\s?(.*)\s*' % value, sT) for sT in spType]
spList.extend([ll for x in spType for ll in x])
if spList:
if firstOnly:
return spList[0]
else:
return spList
else:
return None
def _getBlocks(self, value, list=[]):
if value in self.name:
list.append(self)
for ky in self.keys():
if isinstance(self[ky], SparkyDict):
self[ky]._getBlocks(value, list)
return list
[docs] def getBlocks(self, value, firstOnly=False):
list = self._getBlocks(value, list=[])
if list:
if firstOnly:
return list[0]
else:
return list
else:
return None
[docs]class TypeBlock(SparkyDict):
# cheat and just copy the SparkyDict under a new name
def __init__(self, name=SPARKY_ROOT):
super(SparkyDict, self).__init__(name=name)
[docs]class CcpnSparkyReader:
def __init__(self, application, specificationFile: str = None, mode: str = 'standard',
testing: bool = False):
# just copied from Rasmus for the minute
self.application = application
self.mode = mode
self.saveFrameName = None
self.warnings = []
self.errors = []
self.testing = testing
self.text = None
self.tokeniser = None
self.allowSquareBracketStrings = False
self.lowerCaseTags = True
self.enforceSaveFrameStop = False
self.enforceLoopStop = False
self.stack = []
self.globalsCounter = 0
self.columns = ['ResidueType', 'ResidueCode', 'FirstAtomName', 'SecondAtomName']
def _processVersion(self, value):
# next token must be version
stack = self.stack
last = stack[-1]
if isinstance(last, SparkyDict):
try:
func = last
except AttributeError:
raise SparkySyntaxError(self._errorMessage("Error inserting version num" % value,
value))
func[SPARKY_VERSION] = value
elif isinstance(last, list):
try:
func = last.append
except AttributeError:
raise SparkySyntaxError(self._errorMessage("Error inserting version num" % value,
value))
func(value)
return
def _processComment(self, value):
# Comments are ignored
return
[docs] def processValue(self, value):
stack = self.stack
last = stack[-1]
# if isinstance(last, SparkyProjectBlock):
# # currently ignore until we have a SparkyDict
# return
if isinstance(last, TypeBlock): # assume in a type block for now
try:
# parse the line again and add to the dictionary
typeList = self._getTokens(value, str)
last[typeList[0]] = typeList[1:]
except AttributeError:
raise SparkySyntaxError(self._errorMessage('Error processing typeList',
value))
if isinstance(last, SparkyDict):
return
if isinstance(last, str):
# Value half of tag, value pair
stack.pop()
stack[-1].addItem(last, value)
elif isinstance(last, OrderedDict): # assume in a type block for now
try:
# parse the line again and add to the dictionary
typeList = self._getTokens(value, str)
last[typeList[0]] = typeList[1:]
except AttributeError:
raise SparkySyntaxError(self._errorMessage('Error processing typeList',
value))
else:
try:
func = last.append
except AttributeError:
raise SparkySyntaxError(self._errorMessage("Data value %s must be in item or loop_" % value,
value))
func(value)
def _processBadToken(self, value, typ):
raise SparkySyntaxError(self._errorMessage("Illegal token of type% s: %s" % (typ, value), value))
def _addSparkyDict(self, name):
container = self.stack[-1]
currentNames = [ky for ky in container.keys() if name in ky]
if currentNames:
name = name + str(len(currentNames)) # add an incremental number to the name
# name is the new named block
obj = SparkyDict(name)
container.addItem(name, obj)
self.stack.append(obj)
self.stack.append(list()) # put a list on as well
def _addTypeBlock(self, name):
container = self.stack[-1]
# currentNames = [ky for ky in container.keys() if name in ky]
# if currentNames:
# name = name+str(len(currentNames)) # add an incremental number to the name
# name is the new named block
obj = TypeBlock(name)
if name not in container:
container.addItem(name, list()) # add a list inside the parent block
self.stack.append(obj) # and add a new block to the end
# self.stack.append(OrderedDict()) # put a list on as well?
def _closeSparkyDict(self, value):
stack = self.stack
lowerValue = value.lower()
# Terminate loop
if isinstance(stack[-1], list):
if self.enforceLoopStop:
raise SparkySyntaxError(
self._errorMessage("Loop terminated by %s instead of stop_" % value, value)
)
else:
# Close loop and pop it off the stack
self._closeList(value)
if isinstance(stack[-1], TypeBlock):
self._closeDict(stack[-1].name) # force close, popping the TypeBlock
# terminate SparkyDict
if isinstance(stack[-1], SparkyDict):
if stack[-1].name.startswith(value):
# Simple terminator. Close sparky block
stack.pop()
elif self.enforceSaveFrameStop:
self._errorMessage("SaveFrame terminated by %s instead of save_" % value, value)
else:
# New saveframe start. We are missing the terminator, but close and continue anyway
getLogger().warning('Closing sparkyDict with missing terminator')
stack.pop()
if stack:
stack.append(list()) # in case there are more data items to add
if not isinstance((stack[-1]), SparkyDict):
if lowerValue.startswith(SPARKY_ENDDICT):
raise SparkySyntaxError(self._errorMessage("'%s' found out of context" % value, value))
def _openSparkyDict(self, value):
# start a new sparky block, which is everything
stack = self.stack
# Add new SparkyDict
if self.lowerCaseTags:
value = value.lower()
if isinstance(stack[-1], SparkyDict):
self._addSparkyDict(value)
elif isinstance(stack[-1], list):
self._closeList(value) # close the list and store
self._addSparkyDict(value)
else:
raise SparkySyntaxError(
self._errorMessage("SparkyDict start out of context: %s" % value, value)
)
def _openTypeBlock(self, value):
# start a new sparky block, which is everything
stack = self.stack
# Add new SparkyDict
if self.lowerCaseTags:
value = value.lower()
if isinstance(stack[-1], TypeBlock):
if value != SPARKY_NESTED: # fix for the minute
self._closeDict(value) # close the list and store
self._addTypeBlock(value)
elif isinstance(stack[-1], SparkyDict):
self._addTypeBlock(value)
elif isinstance(stack[-1], list):
self._closeList(value) # close the list and store
self._addTypeBlock(value)
# elif isinstance(stack[-1], OrderedDict):
# self._closeDict(value) # close the list and store
# self._addTypeBlock(value)
else:
raise SparkySyntaxError(
self._errorMessage("SparkyDict start out of context: %s" % value, value)
)
def _closeDict(self, value):
stack = self.stack
data = stack.pop() # remove the dict from the end
block = stack[-1] # point to the last block
if not isinstance(block, SparkyDict):
if isinstance(data, SparkyDict):
raise TypeError("Implementation error, loop not correctly put on stack")
else:
raise SparkySyntaxError(self._errorMessage("Error: %s outside list" % value, value))
if data:
dataName = data.name
if dataName in block:
block[dataName].append(data) # SHOULD be in the block
else:
dataName = data.name
if dataName in block and not block[dataName]: # remove the left-over tag
del block[dataName]
pass
def _closeList(self, value):
stack = self.stack
data = stack.pop() # remove the list from the end
block = stack[-1] # point to the last block
if not isinstance(block, SparkyDict):
if isinstance(data, SparkyDict):
raise TypeError("Implementation error, loop not correctly put on stack")
else:
raise SparkySyntaxError(self._errorMessage("Error: %s outside list" % value, value))
# columnCount = len(loop._columns)
# if not columnCount:
# raise SparkySyntaxError(self._errorMessage(" loop lacks column names", value))
if data:
dataName = SPARKY_DATA
currentNames = [ky for ky in block.keys() if SPARKY_DATA in ky]
if currentNames:
dataName = SPARKY_DATA + str(len(currentNames)) # add an incremental number to the name
block[dataName] = data
# if len(data) % columnCount:
# if self.padIncompleteLoops:
# print("WARNING Token %s: %s in %s is missing %s values. Last row was: %s"
# % (self.counter, loop, self.stack[-2],
# columnCount - (len(data) % columnCount), data[-1]))
# else:
# raise SparkySyntaxError(
# self._errorMessage("loop %s is missing %s values"
# % (loop, (columnCount - (len(data) % columnCount))), value)
# )
#
# # Make rows:
# args = [iter(data)] * columnCount
# for tt in zip_longest(*args, fillvalue=NULLSTRING):
# loop.newRow(values=tt)
else:
# empty loops appear here. We allow them, but that could change
pass
def _getToken(self, text, value):
# return the value'th token from a string
# just simple string NOT containing whitespace or '|' pipe character
vals = re.findall(r"""(?:\|\s*|\s*)([a-zA-Z0-9,._^'";$!^/-]+)""", text)
try:
return vals[value]
except:
return None
def _getTokens(self, text, funcType, start=0, end=0):
# returns the list cast as funcType
# just simple string NOT containing whitespace or '|' pipe character
vals = re.findall(r"""(?:\|\s*|\s*)([a-zA-Z0-9,._^'";$!^/-]+)""", text)
try:
valEnd = len(vals) - 1
if start >= end:
end = len(vals) - 1
else:
end = min(end, valEnd)
return [funcType(val) for val in vals[start:end + 1]]
except:
return None
[docs] def parseSparkyFile(self, path):
_path = aPath(path)
if not _path.exists():
raise FileNotFoundError('File %s does not exist' % path)
with open(_path, 'r') as fileName:
self.text = fileName.read()
self.tokeniser = getSparkyTokenIterator(self.text)
processValue = self.processValue
processFunctions = [None] * 21
processFunctions[SP_TOKEN_SQUOTE_STRING] = self.processValue
processFunctions[SP_TOKEN_DQUOTE_STRING] = self.processValue
# processFunctions[SP_TOKEN_MULTILINE] = self.processValue
processFunctions[SP_TOKEN_VERSION] = self._processVersion
processFunctions[SP_TOKEN_COMMENT] = self._processComment
# processFunctions[SP_TOKEN_LOOP] = self._openLoop
# processFunctions[SP_TOKEN_LOOP_STOP] = self._closeLoop
# processFunctions[SP_TOKEN_GLOBAL] = self._processGlobal
# processFunctions[SP_TOKEN_DATA_DICT] = self._processDataBlock
unquotedValueTags = (SP_TOKEN_STRING, SP_TOKEN_NULL, SP_TOKEN_UNKNOWN, SP_TOKEN_SAVE_FRAME_REF)
# quotedValueTags = (TOKEN_SQUOTE_STRING, TOKEN_DQUOTE_STRING, TOKEN_MULTILINE)
stack = self.stack
name = aPath(path).basename
# result = SparkyProjectBlock(name=name)
# stack.append(result)
# now process the file
value = None
self.counter = 0 # Token counter
try:
for tk in self.tokeniser:
self.counter += 1
typ, value = tk
if typ in unquotedValueTags:
value = UnquotedValue(value)
processValue(value)
else:
func = processFunctions[typ]
if func is None:
if typ == SP_TOKEN_SPARKY_PROJECT:
# put the first element on the stack
result = SparkyDict(name=name) # result is the actually object, which SHOULD contain all
stack.append(result)
stack.append(list()) # put an empty list on the stack
processValue("sparky %s" % value) # put the type on the stack
processValue("name %s" % name)
processValue("pathname %s" % aPath(path).parent)
elif typ == SP_TOKEN_SPARKY_DICT:
self._openSparkyDict(value)
elif typ == SP_TOKEN_TYPE_DICT:
self._openTypeBlock(value)
elif typ == SP_TOKEN_TYPE_NESTED:
self._openTypeBlock(SPARKY_NESTED)
elif typ == SP_TOKEN_END_NESTED:
self._closeDict(SPARKY_NESTED)
elif typ == SP_TOKEN_END_SPARKY_DICT:
self._closeSparkyDict(value)
elif typ in (SP_TOKEN_BAD_CONSTRUCT, SP_TOKEN_BAD_TOKEN):
self._processBadToken(value, typ)
elif typ == SP_TOKEN_SQUARE_BRACKET:
if self.allowSquareBracketStrings:
processValue(UnquotedValue(value))
else:
self._processBadToken(value, typ)
else:
raise SparkySyntaxError("Unknown token type: %s" % typ)
else:
func(value)
# End of data - clean up stack
if isinstance(stack[-1], str):
raise SparkySyntaxError(self._errorMessage("File ends with item name", value))
self._closeSparkyDict(name)
# if isinstance(stack[-1], list):
# self._closeList('<End-of-File>')
#
# if isinstance(stack[-1], SparkyDict):
# stack.pop()
if stack:
raise RuntimeError(self._errorMessage("stack not empty at end of file", value))
except Exception as es:
print("ERROR at token %s" % self.counter)
raise
#
return result
def _errorMessage(self, msg, value):
"""Make standard error message"""
template = "Error in context: %s, at token %s, line: %s\n%s"
tags = [(x if isinstance(x, str) else x.name) for x in self.stack[1:]] + [value]
lines = self.text.splitlines()
lineCount = len(lines)
ii = 0
if tags:
jj = 0
tag = tags[jj]
while ii < lineCount:
if tag in lines[ii].lower().split():
# This line contains the current tag - go to the next tag
jj += 1
if jj < len(tags):
tag = tags[jj]
else:
# This line contains the last of the tags - it is the line we want
break
else:
# nothing found here - try next line
ii += 1
#
return template % (tags[:-1], tags[-1], ii + 1, msg) #
def _getSparkyDataList(self, sparkyDict, value):
if SPARKY_DATA in sparkyDict and sparkyDict[SPARKY_DATA]:
spType = sparkyDict[SPARKY_DATA]
if spType:
spType = [re.findall(r'%s\s?(.*)\s*' % value, sT) for sT in spType]
spType = [ll for x in spType for ll in x]
if spType:
return spType
return None
def _getSparkyDict(self, sparkyDict, name, list=[]):
if name in sparkyDict.name:
list.append(sparkyDict)
for ky in sparkyDict.keys():
if isinstance(sparkyDict[ky], SparkyDict):
self._getSparkyDict(sparkyDict[ky], name, list)
return list
[docs] def importSpectra(self, project, saveBlock, parentBlock=None):
getLogger().info('Importing Sparky spectra: %s' % saveBlock.name)
# process the save files to get the spectra
pathName = saveBlock.getDataValues(SPARKY_PATHNAME, firstOnly=True)
spectra = saveBlock.getBlocks(SPARKY_SPECTRUM, firstOnly=True)
fileName = spectra.getDataValues(SPARKY_NAME, firstOnly=True)
filePath = spectra.getDataValues(SPARKY_PATHNAME, firstOnly=True)
# workshopPath = os.path.abspath(
# os.path.join(pathName, '../lists/' + fileName + '.list.workshop'))
spectrumPath = (aPath(pathName) / aPath(filePath)).normalise()
loadedSpectrum = None
if spectrumPath.exists():
getLogger().info('Loading spectrum: %s' % spectrumPath)
loadedSpectrum = self.project.application.loadData(spectrumPath) # load the spectrum
if not loadedSpectrum:
if parentBlock:
fileName = aPath(parentBlock.getDataValues(SPARKY_NAME, firstOnly=True))
filePath = aPath(parentBlock.getDataValues(SPARKY_PATHNAME, firstOnly=True))
raise ValueError('Error in .proj file:\n{}\n<savefile>: {}'.format(filePath / fileName, spectrumPath))
else:
raise ValueError('Error in .save file.\n<savefile>: {}'.format(spectrumPath))
# need to remove any bad characters from the spectrum name
spectrumName = loadedSpectrum[0].id # returns a list
spectra[SPARKY_MODIFIEDNAME] = spectrumName
spectra[SPARKY_ORIGINALNAME] = saveBlock.name # store the original name
spectra[SPARKY_HASHNAME] = str(abs(hash(spectrumName)) % (10 ** 3)) # testing for names that are too long
spectrum = loadedSpectrum[0] # project.getObjectsByPartialId(className='Spectrum', idStartsWith=spectrumName)
if spectrum:
spectrumShift = spectra.getDataValues('shift', firstOnly=True)
spectrumShiftVals = self._getTokens(spectrumShift, float)
assignRelation = spectra.getDataValues('assignRelation', firstOnly=False)
axes = [None] * len(assignRelation)
for ax in assignRelation:
axisNum = int(self._getToken(ax, 0))
axisType = int(self._getToken(ax, 1))
axisName = self._getToken(ax, 2)
if axisNum:
axes[axisNum - 1] = (axisType, axisName)
# generate the mapping of sparky axes to Ccpn axes
specAxes = spectrum.axisCodes
reorder = self._reorderAxes(axes, specAxes)
if reorder:
# apply the sparky spectrum shift to the Ccpn reference values
currentRefValues = list(spectrum.referenceValues)
for specInd in range(0, len(spectrumShiftVals)):
currentRefValues[specInd] = currentRefValues[specInd] + spectrumShiftVals[reorder[specInd]]
spectrum.referenceValues = currentRefValues
def _reorderAxes(self, axes, newAxes):
# renameAxes = [None] * len(axes)
renameAxes = [n for n in range(len(newAxes))]
renameAxes2 = [None] * len(newAxes)
for ai, axis in enumerate(newAxes):
# if axis[1]:
try:
ax2 = [ind for ind, aj in enumerate(axes) if aj[1] and (aj[1] in axis or aj[1] in axis)]
# found a element so put it into the list
del renameAxes[ai]
except Exception as es:
pass
if ax2:
renameAxes2[ai] = ax2[0] #(axis[0], axis[1] + str(len(ax2)))
# list index of Nones in renameAxes2
outAxes = [aj for aj, axisj in enumerate(renameAxes2) if not axisj]
for ii in outAxes:
for jj in range(len(renameAxes2)): # add any numbers that are missing from the list
if jj not in renameAxes2:
renameAxes2[ii] = jj
break
return renameAxes2
def _testFirstPeak(self, peakData, spectrum, peakAxes):
if peakData and spectrum:
line = peakData[0].getData(PEAK_POS)
if line:
for peakAxis in itertools.permutations(peakAxes):
badAxes = self._testPermutation(line, peakAxis, spectrum)
if not badAxes:
return peakAxis
return peakAxes
def _testPermutation(self, line, peakAxis, spectrum):
badAxes = []
posList = [float(val) for val in line]
peakPos = [posList[i] for i in peakAxis]
for ind, val in enumerate(peakPos):
viewParams = (spectrum.aliasingLimits)[ind]
if (val > max(viewParams)) or (val < min(viewParams)):
badAxes.append(ind)
getLogger().warning("Axes error: axis '%s' is out of bounds" % spectrum.axisCodes[ind])
return badAxes
[docs] def importPeakLists(self, project, saveBlock, sparkyDict):
# process the save files to get the spectra
pathName = saveBlock.getDataValues(SPARKY_PATHNAME, firstOnly=True)
spectra = saveBlock.getBlocks(SPARKY_SPECTRUM, firstOnly=True)
molName = spectra.getDataValues(SPARKY_MOLECULE, firstOnly=True)
condName = spectra.getDataValues(SPARKY_CONDITION, firstOnly=True)
# TODO:ED this needs to be a unique name
defaultName = spectra.getParameter(SPARKY_HASHNAME, firstOnly=True)
nmrChainName = self._buildName([molName, condName], default=defaultName)
attachedPeak = spectra.getBlocks(SPARKY_ATTACHEDDATA, firstOnly=True)
spectrumName = spectra.getParameter(SPARKY_MODIFIEDNAME, firstOnly=True)
# current test to decide whether to import a peak list
peakAxes = attachedPeak.getDataValues('peak_pattern_axes', firstOnly=True)
peakName = attachedPeak.getDataValues('peak_pattern_name', firstOnly=True)
peakPatternAxes = None
# don't think need this test now
if peakAxes is not None and peakName is not None or True:
# assume that we have to import a peaklist
if peakAxes:
peakPatternAxes = self._getTokens(peakAxes, int)
assignRelation = spectra.getDataValues('assignRelation', firstOnly=False)
axes = [None] * len(assignRelation)
# not correct...
for ax in assignRelation:
axisNum = int(self._getToken(ax, 0))
axisType = int(self._getToken(ax, 1))
axisName = self._getToken(ax, 2)
if axisNum:
axes[axisNum - 1] = (axisType, axisName)
errorLine = 0
try:
project.suspendNotification()
if len(assignRelation) != len([ax for ix, ax in enumerate(axes) if axes[ix]]):
getLogger().info('assignRelation not found: %s' % saveBlock.name)
else:
# read in the peak list
peakBlock = spectra.getBlocks(SPARKY_ORNAMENT, firstOnly=True)
if peakBlock:
peakData = peakBlock.getData(name=SPARKY_PEAK)
if peakData:
spectrum = project.getObjectsByPartialId(className='Spectrum', idStartsWith=spectrumName)
if spectrum:
newPeakList = spectrum[0].peakLists[0] # get the first one .newPeakList()
if not peakPatternAxes:
spectrumAxes = spectrum[0].axisCodes
peakPatternAxes = self._reorderAxes(axes, spectrumAxes)
# TODO:ED need to remove hard coding for search of properties
# ii=0
# while ii<len(peakData)-PEAK_MAXSEARCH:
# test the first peak to check that it fits in the max/minAliasedFrequency
peakPatternAxes = self._testFirstPeak(peakData, spectrum[0], peakPatternAxes)
# iterate over the peaks
for thisPeak in peakData:
# if self._getToken(peakData[ii], 0) == PEAK_TYPE\
# and self._getToken(peakData[ii], 1) == PEAK_PEAK:
# TODO:ED put some more error checking in here - need to parse properly
found = 0
# find the peak position
line = thisPeak.getData(PEAK_POS)
if line:
found = found | PEAK_POSNUM
posList = [float(val) for val in line]
# find the resonance numbering
line = thisPeak.getData(PEAK_RESONANCE)
if line:
found = found | PEAK_RESONANCENUM
resList = line
# find the peak height
line = thisPeak.getData(PEAK_HEIGHT)
if line:
found = found | PEAK_HEIGHTNUM
heightList = line
# find the peak linewidth
line = thisPeak.getData(PEAK_LINEWIDTH)
if line:
found = found | PEAK_LINEWIDTHNUM
linewidthList = line
peakPos = [posList[i] for i in peakPatternAxes]
if found:
# make a new peak
peak = newPeakList.newPeak()
if found & PEAK_POSNUM:
peak.position = peakPos
if found & PEAK_HEIGHTNUM and len(heightList) > 1:
peak.height = float(heightList[1])
if found & PEAK_LINEWIDTHNUM and len(linewidthList) > 1:
peak.lineWidths = tuple(float(lw) for lw in linewidthList[:2])
if found & PEAK_RESONANCENUM:
nmrChain = project.fetchNmrChain(nmrChainName)
ri = 0
while ri < len(resList):
resType = resList[ri][0]
resName = resList[ri][1:] # clip the chain type from the head
axisCode = resList[ri + 1]
nmrResidue = nmrChain.fetchNmrResidue(sequenceCode=resName, residueType=resType)
self._fetchAndAssignNmrAtom(peak, nmrResidue, axisCode)
ri += 2
# if found & (PEAK_POSNUM | PEAK_RESONANCENUM): # test without residue
# peak = newPeakList.newPeak(ppmPositions=peakPos)
#
# elif found == PEAK_ALLFOUND:
# # TODO:ED check with specta other than N-H, multidimensional etc.
#
# peak = newPeakList.newPeak(ppmPositions=peakPos)
#
# # TODO:ED check that the molName matches molecule/condition
# nmrChain = project.fetchNmrChain(nmrChainName)
#
# ri=0
# while ri < len(resList):
# resType = resList[ri][0]
# resName = resList[ri][1:] # clip the chain type from the head
# axisCode = resList[ri+1]
# nmrResidue = nmrChain.fetchNmrResidue(sequenceCode=resName, residueType=resType)
# self._fetchAndAssignNmrAtom(peak, nmrResidue, axisCode)
# ri += 2
except Exception as es:
getLogger().warning('Error importing peakList: %s %i' % (saveBlock.name, errorLine))
finally:
project.resumeNotification()
[docs] def importSparkyProject(self, project, sparkyDict):
"""Import entire project from dataBlock into empty Project"""
t0 = time.time()
self.warnings = []
self.project = project
# traverse the sparkyDict and insert into project
sparkyType = sparkyDict.getDataValues(SPARKY_SPARKY, firstOnly=True)
if sparkyType == SPARKY_PROJECT:
# load project file
fileName = sparkyDict.getDataValues(SPARKY_NAME, firstOnly=True)
filePath = sparkyDict.getDataValues(SPARKY_PATHNAME, firstOnly=True)
saveFiles = sparkyDict.getBlocks(SPARKY_SAVEFILES, firstOnly=True)
loadedBlocks = []
for sp in saveFiles.getData():
savefilePath = aPath(filePath) / sp
if not savefilePath.exists():
getLogger().warning('Skipping %s, file not found' % savefilePath)
else:
loadedBlocks.append(self.parseSparkyFile(savefilePath))
# now import the molecule from the main project file
self.importSparkyMolecule(project, sparkyDict)
# load spectrum data
for isf in loadedBlocks:
self.importSpectra(project, isf, sparkyDict) # modify to load from the project
self.importPeakLists(project, isf, sparkyDict) # modify to load from the project
elif sparkyType == SPARKY_SAVE:
self.importSpectra(project, sparkyDict)
self.importPeakLists(project, sparkyDict, sparkyDict) # modify to load from the project
else:
getLogger().warning('Unknown Sparky File Type')
t2 = time.time()
getLogger().debug('Imported Sparky file into project, time = %.2fs' % (t2 - t0))
for msg in self.warnings:
print('====> ', msg)
self.project = None
def _createDataFrame(self, input_path):
return pd.read_table(input_path, delim_whitespace=True, )
def _splitAssignmentColumn(self, dataFrame):
""" parses the assignment column.
Splits the column assignment in four columns: ResidueName ResidueCode AtomName1 AtomName2.
"""
assignments = [re.findall('\d+|\D+', s) for s in dataFrame.iloc[:, 0]]
assignmentsColumns = []
for a in assignments:
try:
i, j, *args = a
atoms = (''.join(args)).split('-')
if len(atoms) == 2:
firstAtom, secondAtom = atoms
assignmentsColumns += ((i, j, firstAtom, secondAtom),)
except:
getLogger().warning('Undefined atom assignment %s' % str(a))
return pd.DataFrame(assignmentsColumns, columns=self.columns)
def _mergeDataFrames(self, generalDF, assignmentDF):
"""
:param generalDF: first dataframe with assignments all in on column
:param assignmentDF: assignments dataframe in 4 columns
:return: new dataframe with four assignment columns + the original without the first column
"""
partialDf = generalDF.drop(generalDF.columns[0], axis=1)
return pd.concat([assignmentDF, partialDf], axis=1, join_axes=[partialDf.index])
def _correctChainResidueCodes(self, chain, ccpnDataFrame):
""" renames Chain residueCodes correctly according with the dataFrame, if duplicates, deletes them.
"""
for residue, resNumber, in zip(chain.residues, ccpnDataFrame.ResidueCode):
try:
residue.rename(str(resNumber))
except:
residue.delete()
return chain
def _createCcpnChain(self, project, ccpnDataFrame):
"""makes a chain from the ResidueTypes.
CCPN wants a long list of one Letter Codes without spaces"""
residueTypes = ''.join([i for i in ccpnDataFrame.ResidueType])
newChain = project.createChain(residueTypes, molType='protein')
self._correctChainResidueCodes(newChain, ccpnDataFrame)
return newChain
def _createNewCcpnChain(self, project, chainList, resList):
newChain = project.createChain(chainList, molType='protein')
for residue, resNumber, in zip(newChain.residues, resList):
try:
residue.rename(str(resNumber))
except:
residue.delete()
return newChain
def _fetchAndAssignNmrAtom(self, peak, nmrResidue, atomName):
isotopeCode = DEFAULT_ISOTOPE_DICT.get(str(atomName))
atom = nmrResidue.fetchNmrAtom(name=str(atomName), isotopeCode=isotopeCode)
peak.assignDimension(axisCode=atomName[0], value=[atom])
def _connectNmrResidues(self, nmrChain):
updatingNmrChain = None
nrs = nmrChain.nmrResidues
for i in range(len(nrs) - 1):
currentItem, nextItem = nrs[i], nrs[i + 1]
if currentItem and nextItem:
# check that the sequence codes are consecutive
if int(nextItem.sequenceCode) == int(currentItem.sequenceCode) + 1:
updatingNmrChain = currentItem.connectNext(nextItem, )
return updatingNmrChain
def _assignNmrResiduesToResidues(self, connectedNmrChain, ccpnChain):
for nmrResidue, residue in zip(connectedNmrChain.nmrResidues, ccpnChain.residues):
nmrResidue.residue = residue
def _parseDataFrame(self, ccpnDataFrame, spectrum, nmrChain):
lastNmrResidue = None
newPeakList = spectrum.newPeakList()
foundResNumber = list(ccpnDataFrame.iloc[:, 1])
for i, resType, resNumber, atom1, atom2, pos1, pos2, in zip(range(len(ccpnDataFrame.iloc[:, 0]) - 1), ccpnDataFrame.iloc[:, 0],
ccpnDataFrame.iloc[:, 1], ccpnDataFrame.iloc[:, 2],
ccpnDataFrame.iloc[:, 3], ccpnDataFrame.iloc[:, 4],
ccpnDataFrame.iloc[:, 5]):
peak = newPeakList.newPeak(ppmPositions=(float(pos2), float(pos1)))
if resNumber in foundResNumber[
:i]: # in case of duplicated Residues Eg sideChain W2023N-H H and W2023NE1-HE1, don't need to create a new nmrResidue, just add the atoms to the previous one.
nmrResidue = lastNmrResidue
if nmrResidue:
self._fetchAndAssignNmrAtom(peak, nmrResidue, atom2)
self._fetchAndAssignNmrAtom(peak, nmrResidue, atom1)
else:
nmrResidue = nmrChain.fetchNmrResidue(sequenceCode=str(resNumber))
lastNmrResidue = nmrResidue
if nmrResidue:
self._fetchAndAssignNmrAtom(peak, nmrResidue, atom2)
self._fetchAndAssignNmrAtom(peak, nmrResidue, atom1)
return nmrChain
def _buildName(self, names, default=SPARKY_DEFAULTCHAIN):
# remove spaces, commas, fullstops from names to be used as Pids
name = ''
for nm in names:
if nm:
exclude = ' ,.'
regex = re.compile('[%s]' % re.escape(exclude))
name = name + regex.sub('', nm)
return name if name else default
[docs] def initParser(self, project, input_path, spectrum):
generalDF = self._createDataFrame(input_path)
assignmentDF = self._splitAssignmentColumn(generalDF)
ccpnDataFrame = self._mergeDataFrames(generalDF, assignmentDF)
ccpnChain = self._createCcpnChain(project, ccpnDataFrame)
nmrChain = project.fetchNmrChain('A')
newNmrChain = self._parseDataFrame(ccpnDataFrame, spectrum, nmrChain)
connectedNmrChain = self._connectNmrResidues(newNmrChain)
self._assignNmrResiduesToResidues(connectedNmrChain, ccpnChain)
[docs] def importSparkyMolecule(self, project, sparkyDict):
# read the molecules from the sparky project and load the resonances
getLogger().info('Importing Sparky molecular chains: %s' % sparkyDict.name)
molecules = sparkyDict.getBlocks(SPARKY_MOLECULE)
for mol in molecules:
molName = self._buildName([mol.getDataValues(SPARKY_NAME, firstOnly=True)], default=mol.name)
attData = mol.getBlocks(SPARKY_ATTACHEDDATA, firstOnly=True) # should be only one
seq = attData.getDataValues(SPARKY_SEQUENCE, firstOnly=True)
firstRes = attData.getDataValues(SPARKY_FIRSTRESIDUENUM, firstOnly=True)
if seq and firstRes:
newChain = project.createChain(sequence=seq,
molType='protein',
startNumber=int(firstRes),
compoundName=molName)
conditions = mol.getBlocks(SPARKY_CONDITION)
for condition in conditions:
conditionName = condition.getDataValues(SPARKY_NAME, firstOnly=True)
nmrChainName = self._buildName([molName, conditionName], molName)
# assume only one block in each condition
resBlock = condition.getBlocks(SPARKY_RESONANCES, firstOnly=True)
if resBlock:
getLogger().info(' Importing Sparky molecular chain: %s' % resBlock.name)
resList = resBlock.getData()
chain = ''
nmrResList = []
nmrAtomList = []
for res in resList:
try:
# vals = re.findall(r"""(?:\|\s*|\s*)([a-zA-Z0-9,._^'";$!^]+)""", res)
resType = self._getToken(res, 0)[0] # str(vals[0][0])
resName = self._getToken(res, 0)[1:] # str(vals[0][1:])
atomType = self._getToken(res, 1) # str(vals[1])
chemShift = float(self._getToken(res, 2)) # float(vals[2])
atomIsotopeCode = self._getToken(res, 3) # str(vals[3])
if resName not in nmrResList:
nmrResList.append(resName)
chain = chain + resType
nmrAtomList.append((resType, resName, atomType, chemShift, atomIsotopeCode))
except Exception as es:
getLogger().warning('Incorrect resonance.')
# rename to the nmrResidue names in the project
nmrChain = project.fetchNmrChain(nmrChainName)
for resType, resName, atomType, chemShift, atomIsotopeCode in nmrAtomList:
nmrResidue = nmrChain.fetchNmrResidue(sequenceCode=resName, residueType=resType)
if nmrResidue:
newAtom = nmrResidue.newNmrAtom(name=atomType, isotopeCode=atomIsotopeCode)
# self.importConnectedNmrResidues(project, nmrChain)
[docs] def importConnectedNmrResidues(self, project, nmrChain):
project.suspendNotification()
try:
connectedNmrChain = self._connectNmrResidues(nmrChain)
# self._assignNmrResiduesToResidues(connectedNmrChain, ccpnChain)
except Exception as es:
getLogger().warning('Error connecting nmrChain: %s' % (nmrChain.id,))
finally:
project.resumeNotification()
[docs] def assignNmrChain(self, nmrChain):
# not done yet
# project.suspendNotification()
try:
pass
# connect the nmrResidues and assignTo
connectedNmrChain = self._connectNmrResidues(nmrChain)
# self._assignNmrResiduesToResidues(connectedNmrChain, ccpnChain)
except Exception as es:
getLogger().warning('Error connecting nmrChain: %s' % (nmrChain.id,))
finally:
pass
# project.resumeNotification()
[docs]class CcpnSparkyWriter:
# ejb - won't be implemented yet
def __init__(self, project: Project, specificationFile: str = None, mode: str = 'strict',
programName: str = None, programVersion: str = None):
self.project = project
self.mode = mode
if __name__ == '__main__':
axes = [(1, 'n'),
(2, 'h'),
(3, 'n'),
(4, 'h'),
(5, 'h'),
(6, 'ca'),
(7, 'ca'),
(8, 'cb'),
(9, 'h'),
(10, 'cb')]
newAxes = ['h1', 'h3', 'n', 'cb1', 'ca', 'cb', 'n1', 'h', 'ca1', 'h2']
axes = [(1, 'n'),
(2, 'n'),
(3, 'h')]
newAxes = ['h', 'n', 'n1']
for ai, axis in enumerate(axes):
ax2 = [aj for aj in axes[0:ai] if axis[1] in aj[1]]
if ax2:
axes[ai] = (axis[0], axis[1] + str(len(ax2)))
out2 = [ai for aj, axisj in enumerate(newAxes) for ai, axis in enumerate(axes) if axisj == axis[1]]
print(axes)
print(out2)