Source code for ccpn.util.nef.StarIo

"""I/O for NEF and NmrStar formats.

The functions to use are

and other STAR variants satisfying the following requirements:

      - all plain tags in a saveframe start with a common prefix;
      for NEF files this must be the '<sf_category>' followed by '.',
      and the framecode value must start with the '<sf_category>' followed by underscore.

      - All loop column names start with '<loopcategory>.'

      - loopcategories share a namespace with tags within a saveframe

      - DataBlocks can contain only saveframes.

      - For NEF files the

  Use the functions parseNmrStar, parseNef, parseNmrStarFile, parseNefFile

   The 'File' functions take a file name and pass the file contents to corresponding parser.

   The 'NmrStar' functions will read any Star file that satisfies the constraints above, while
    the 'Nef' functions will also enforce the NEF=-specific constraints above


  On reading tag prefixes ('_', 'save_', 'data_' are stripped,
  as are the parts of tags before the first '.'
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals


#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See https://ccpn.ac.uk/software/licensing/")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Geerten Vuister $"
__dateModified__ = "$dateModified: 2022-02-17 19:09:51 +0000 (Thu, February 17, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2017-04-07 10:28:41 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================

# NB Assumes that file was parsed with lowercaseTags = True


import keyword
import os

from . import GenericStarParser


NULLSTRING = GenericStarParser.NULLSTRING
TRUESTRING = GenericStarParser.TRUESTRING
FALSESTRING = GenericStarParser.FALSESTRING
UNKNOWNSTRING = GenericStarParser.UNKNOWNSTRING
UnquotedValue = GenericStarParser.UnquotedValue

# Make target string (translator) for mapping, to work in Python 2 and 3 both
# Unprintable characters map to '_', bytes above 128 map to '?'
ll = 33 * ['_'] + list(chr(x) for x in range(33, 127)) + ['_'] + 128 * ['?']
# "'# (double quote, single quote, and pound sign) map to '?'
ll[34] = ll[35] = ll[39] = '?'
latin_1_to_framecode_translator = ''.join(ll)


[docs]def parseNmrStar(text, mode='standard'): """load NMRSTAR file""" dataExtent = GenericStarParser.parse(text, mode) converter = _StarDataConverter(dataExtent) converter.preValidate() result = converter.convert() # return result
[docs]def parseNmrStarFile(fileName, mode='standard', wrapInDataBlock=False): """parse NMRSTAR from file. :param fileName: path of the star-file to parse :param mode: parsing mode: any of ('lenient', 'strict', 'standard', 'IUCr') :param wrapInDataBlock: flag; if True a missing DataBlock start will be added :return NmrDataBlock instance """ with open(fileName) as fp: text = fp.read() if wrapInDataBlock and 'save_' in text and not 'data_' in text: text = "data_dummy \n\n" + text dataExtent = GenericStarParser.parse(text, mode) converter = _StarDataConverter(dataExtent, fileType='star') converter.preValidate() result = converter.convert() # return result
[docs]def parseNef(text, mode='standard'): """load NEF from string""" dataExtent = GenericStarParser.parse(text, mode) converter = _StarDataConverter(dataExtent, fileType='nef') converter.preValidate() result = converter.convert() # return result
[docs]def parseNefFile(fileName, mode='standard', wrapInDataBlock=False): """parse NEF from file if wrapInDataBlock missing DataBlock start will be provided""" with open(fileName) as fp: text = fp.read() if wrapInDataBlock and 'save_' in text and not 'data_' in text: text = "data_dummy \n\n" + text dataExtent = GenericStarParser.parse(text, mode) converter = _StarDataConverter(dataExtent, fileType='nef') converter.preValidate() result = converter.convert() # return result
[docs]def string2FramecodeString(text): # Replace code points outside latin-1 range (more than one byte) with '?' result = text.encode('latin_1', 'replace').decode('latin_1') # Translate string, using preset translator result = result.translate(latin_1_to_framecode_translator) # return result
[docs]class StarValidationError(ValueError): pass
[docs]class NmrDataExtent(GenericStarParser.DataExtent): """Top level container (OrderedDict) for NMRSTAR/NEF object tree""" pass
# # We insert these afterwards as we want the functions at the top of the file # # but can only annotate after DataExtent is created # parseNef.__annotations__['return'] = NmrDataExtent # parseNefFile.__annotations__['return'] = NmrDataExtent # parseNmrStar.__annotations__['return'] = NmrDataExtent # parseNmrStarFile.__annotations__['return'] = NmrDataExtent
[docs]class NmrLoop(GenericStarParser.Loop): """Loop for NMRSTAR/NEF object tree The contents, self.data is a list of OrderedDicts matching the column names. rows can be modified or deleted from data, but adding new rows directly is likely to break - use the newRow function.""" @property def category(self): """Loop category tag - synonym for name (unlike the case of SaveFrame)""" return self.name @property def tagPrefix(self): """Prefix to use before item tags on output""" return '_%s.' % self.name
[docs]class NmrSaveFrame(GenericStarParser.SaveFrame): """SaveFrame (OrderedDict)for NMRSTAR/NEF object tree""" def __init__(self, name=None, category=None): super(NmrSaveFrame, self).__init__(name=name) self.category = category @property def tagPrefix(self): """Prefix to use before item tags on output""" return '_%s.' % self.category
[docs] def newLoop(self, name, columns): """Make new NmrLoop and add it to the NmrSaveFrame""" loop = NmrLoop(name, columns) self.addItem(name, loop) return loop
[docs]class NmrDataBlock(GenericStarParser.DataBlock): """DataBlock (OrderedDict)for NMRSTAR/NEF object tree"""
[docs] def newSaveFrame(self, name, category): """Make new NmrSaveFrame and add it to the DataBlock""" name = string2FramecodeString(name) saveFrame = NmrSaveFrame(name, category=category) self.addItem(name, saveFrame) saveFrame.addItem('sf_category', category) saveFrame.addItem('sf_framecode', name) return saveFrame
[docs] def addSaveFrame(self, saveFrame): """Add existing NmrSaveFrame to the DataBlock""" self.addItem(saveFrame['sf_framecode'], saveFrame)
[docs]class NmrLoopRow(GenericStarParser.LoopRow): pass
class _StarDataConverter: """Converter from output of a GeneralStarParser to a NEF or NMRSTAR nested data structure NB Function assumes valid data as output from GeneralStarParser with lowerCaseTags settings and does not double check validity.""" validFileTypes = ('nef', 'star') def __init__(self, dataExtent, fileType='star', specification=None, convertColumnNames=True): # Set option settings if specification is None: self.specification = None else: raise NotImplementedError("_StarDataConverter specification input not yet implemented") fileType = fileType and fileType.lower() if fileType not in self.validFileTypes: raise StarValidationError("fileType %s must be one of %s" % (fileType, self.validFileTypes)) self.fileType = fileType self.convertColumnNames = convertColumnNames self.dataExtent = dataExtent # Stack of objects parsed, to give context for error messages self.stack = [] def preValidate(self): self.stack = [] try: for dataBlock in self.dataExtent.values(): self.preValidateDataBlock(dataBlock) except StarValidationError: raise except: print(self._errorMessage('System error:')) raise def convert(self): nmrDataExtent = NmrDataExtent(name=self.dataExtent.name) self.stack = [] try: for dataBlock in self.dataExtent.values(): newDataBlock = self.convertDataBlock(dataBlock) nmrDataExtent.addItem(newDataBlock.name, newDataBlock) except StarValidationError: raise except: print(self._errorMessage('System error:')) raise # return nmrDataExtent def preValidateDataBlock(self, dataBlock): self.stack.append(dataBlock) name = dataBlock.name if name != 'global_' and not name.startswith('data_'): self.raiseValidationError("DataBlock name must be 'global_' or start with 'data_'") for tag, saveFrame in dataBlock.items(): if isinstance(saveFrame, GenericStarParser.SaveFrame): self.preValidateSaveFrame(saveFrame) else: self.raiseValidationError("%s file DataBlock contains non-saveframe element %s:%s" % (self.fileType, tag, saveFrame)) self.stack.pop() def convertDataBlock(self, dataBlock): self.stack.append(dataBlock) # get NmrDataBlock name name = dataBlock.name if name.startswith('data_'): name = name[5:] or '__MissingDataBlockName' elif name == 'global_': name = 'global' # Make NmrDataBlock and connect it nmrDataBlock = NmrDataBlock(name=name) for saveFrame in dataBlock.values(): nmrSaveFrame = self.convertSaveFrame(saveFrame) nmrDataBlock.addItem(nmrSaveFrame.name, nmrSaveFrame) # self.stack.pop() return nmrDataBlock def preValidateSaveFrame(self, saveFrame): self.stack.append(saveFrame) commonPrefix = os.path.commonprefix([tt[0] for tt in saveFrame.items() if isinstance(tt[1], str)]) tt = commonPrefix.split('.', 1) if len(tt) == 2: prefix = tt[0] + '.' else: self.raiseValidationError( "Saveframe tags do not start with a common dot-separated prefix: %s" % [tt[0] for tt in saveFrame.items() if isinstance(tt[1], str)] ) sf_category = saveFrame.get(prefix + 'sf_category') if sf_category is None: self.raiseValidationError("SaveFrame lacks .sf_category item") sf_framecode = saveFrame.get(prefix + 'sf_framecode') if sf_framecode is None: self.raiseValidationError("SaveFrame lacks .sf_framecode item") sf_lowername = saveFrame.name # NB tags are lower-cased from the parser if sf_lowername.startswith('save_'): sf_lowername = sf_lowername[5:] if sf_lowername != sf_framecode.lower(): self.raiseValidationError("Saveframe.name %s does not match sf_framecode %s" % (sf_lowername, sf_framecode)) if self.fileType == 'nef': if not sf_framecode.startswith(sf_category): self.raiseValidationError("NEF file sf_framecode %s does not start with the sf_category %s" % (sf_framecode, sf_category)) if prefix[1:-1] != sf_category: self.raiseValidationError("NEF file sf_category %s does not match tag prefix %s" % (sf_category, prefix)) else: # NBNB TBD We do not check or store the tag prefix pass for tag, value in saveFrame.items(): self.stack.append(tag) if isinstance(value, GenericStarParser.Loop): if tag == value.name: self.preValidateLoop(value) elif not isinstance(value, str): self.raiseValidationError("Saveframe contains item value of wrong type: %s" % value) self.stack.pop() self.stack.pop() def convertSaveFrame(self, saveFrame): self.stack.append(saveFrame) #Get common dot-separated prefix from non-loop items commonPrefix = os.path.commonprefix([tt[0] for tt in saveFrame.items() if isinstance(tt[1], str)]) tt = commonPrefix.split('.', 1) if len(tt) == 2: prefix = tt[0] + '.' else: self.raiseValidationError( "Saveframe tags do not start with a common dot-separated prefix: %s" % [tt[0] for tt in saveFrame.items() if isinstance(tt[1], str)] ) # get category and framecode # The prevalidation has already established that there is exactly one tag for each tags = [x for x in saveFrame if x.endswith('.sf_framecode')] sf_framecode = saveFrame[tags[0]] tags = [x for x in saveFrame if x.endswith('.sf_category')] sf_category = saveFrame[tags[0]] newSaveFrame = NmrSaveFrame(name=sf_framecode, category=sf_category) lowerCaseCategory = newSaveFrame.category.lower() for tag, value in saveFrame.items(): self.stack.append(tag) # if isinstance(value, str): if isinstance(value, UnquotedValue): value = self.convertValue(value, category=lowerCaseCategory, tag=tag) objname = tag[len(prefix):] newSaveFrame.addItem(objname, value) elif isinstance(value, GenericStarParser.Loop): if tag == value._columns[0]: # Only add loop on first appearance nmrLoop = self.convertLoop(value) newSaveFrame.addItem(nmrLoop.name, nmrLoop) self.stack.pop() # self.stack.pop() return newSaveFrame def preValidateLoop(self, loop): self.stack.append(loop) columns = loop._columns commonPrefix = os.path.commonprefix(columns) if len(commonPrefix.split('.', 1)) != 2: self.raiseValidationError( "Column names of %s do not start with a common dot-separated prefix: %s" % (loop, columns) ) self.stack.pop() def convertLoop(self, loop): self.stack.append(loop) oldColumns = loop.columns commonPrefix = os.path.commonprefix(oldColumns) tt = commonPrefix.split('.', 1) if len(tt) == 2: category = tt[0] lenPrefix = len(category) + 1 if category[0] == '_': category = category[1:] else: self.raiseValidationError( "Column names of %s do not start with a common dot-separated prefix: %s" % (loop, oldColumns) ) columns = [] for ss in oldColumns: tag = ss[lenPrefix:] # Check for valid field names if tag and not tag.isalpha(): if self.convertColumnNames: tag = ''.join(x if x.isalnum() else '_' for x in tag) while tag and not tag[0].isalpha(): tag = tag[1:] else: raise ValueError("Invalid column name 1: %s" % ss) if not tag: raise ValueError("Invalid column name 2: %s" % ss) if keyword.iskeyword(tag): raise ValueError("column name (as modified) clashes with Python keyword: %s" % ss) columns.append(tag) newLoop = NmrLoop(category, columns) ff = self.convertValue #convertValue(value, category=lowerCaseCategory, tag=tag) for row in loop.data: values = [ff(x, category, columns[ii]) if isinstance(x, UnquotedValue) else x for ii, x in enumerate(row.values()) ] newLoop.newRow(values) # self.stack.pop() return newLoop def convertValue(self, value, category=None, tag=None): """Convert unquoted string value.""" # assert isinstance(value, GenericStarParser.UnquotedValue) # if self.specification: # # Add specification-dependent processing here # # # return value # Convert special values if value == NULLSTRING: # null value value = None elif value == TRUESTRING: # Boolean True value = True elif value == FALSESTRING: # Boolean False value = False elif value == UNKNOWNSTRING: value = None elif value[0] == '$': # SaveFrame reference value = value[1:] else: if not (tag[-5:] in ('_code', '_name') or '_code_' in tag or '_name_' in tag): # HACK - tags ending in '_code' or '_name' are assumed to be string type # This takes care of e.g. 'sequence_code' # that often might evaluate to a number otherwise try: value = int(value) except ValueError: try: value = float(value) except ValueError: pass # return value def _errorMessage(self, msg): """Make standard error message""" template = "Error in context: %s\n%s" ll = [(x if isinstance(x, str) else x.name) for x in self.stack] return template % (ll, msg) def raiseValidationError(self, msg): raise StarValidationError(self._errorMessage(msg))
[docs]def splitNefSequence(rows): """Split a sequence of nef_sequence dicts assumed to belong to the same chain into a list of lists of sequentially linked stretches following the NEF rules Note that missing linkings are treated as 'middle' and missing start/end tags are ignored, with the first/last residue treated, effectively, as linking 'break' Only unknown linking values and incorrect pairs of 'cyclic' tags raise an error""" result = [] stretch = [] inCyclic = False for row in rows: linking = row.get('linking') if inCyclic and linking not in ('middle', 'cyclic', None): raise ValueError( "Sequence contains 'cyclic' residue(s) that do not form a closed, cyclic molecule" ) if linking == 'cyclic': if inCyclic: # End of cycle inCyclic = False stretch.append(row) result.append(stretch) stretch = [] else: #start of cycle inCyclic = True if stretch: result.append(stretch) stretch = [row] elif linking in ('single', 'nonlinear', 'dummy'): # Always isolated. And last stretch, add new one, and prepare for the next one if stretch: result.append(stretch) result.append([row]) stretch = [] elif linking == 'start': # Start new stretch if stretch: result.append(stretch) stretch = [row] elif linking == 'end': # End stretch stretch.append(row) result.append(stretch) stretch = [] elif linking in ('middle', None): # Continuation (we treat None as 'middle' as the most pragmatic approach # Validation of the NEF standard must be done elsewhere stretch.append(row) elif linking == 'break': # TODO NBNB This follows NEF spec as of July 2016 - which is rather confused # Propose change so that 'break' signals a chain break AFTER that residue, # and is ONLY used if there is a break between two 'middle' residues. if stretch: # inside stretch - end it stretch.append(row) result.append(stretch) stretch = [] else: # Start of stretch - put row on stretch.append(row) else: raise ValueError("Illegal value of nef_sequence.linking: %s" % linking) if stretch: # Add final stretch if still open result.append(stretch) if inCyclic: raise ValueError( "Sequence contains 'cyclic' residue that is not terminated by matching 'cyclic residue" ) # return result