Source code for ccpn.util.nef.Specification

"""Code for handling NEF specification and metadata

"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (http://www.ccpn.ac.uk) 2014 - 2021"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2021-05-10 18:47:35 +0100 (Mon, May 10, 2021) $"
__version__ = "$Revision: 3.0.4 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2017-04-07 10:28:41 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================

import sys

from . import GenericStarParser
from . import StarIo


INFOPREFIX = 'INFO: '


# TODO, This is a DRAFT only - not used and not currently functional.
# May be upgraded later, for specification-aware NEF I/O


[docs]def getCcpnSpecification(filePath): """Get NEF specification summary with ccpn-specific additions""" converter = CifDicConverter(open(filePath).read(), additionalBlocks=('ccpn_additions',)) # return converter.convertToNef()
[docs]class CifDicConverter(object): """Converts mmcif .dic file, with program-specific additions datablocks into a single NEF data structure, containing: 1) a nef_specification saveframe, containing a dictionary_history loop and a item_type_list loop 2) A saveframe for each saveframe_dategory in the specification. Each saveframe contains items: _nef_saveframe.sf_framecode _nef_saveframe.sf_category _nef_saveframe.is_mandatory _nef_saveframe.description _nef_saveframe.example A table for contained loops: loop_ _nef_loop.category _nef_loop.is_mandatory _nef_loop.description _nef_loop.example And a table for contained items and loop columns: loop_ _nef_item.name _nef_item.loop_category _nef_item.type_code _nef_item.is_mandatory _nef_item.is_key _nef_item.example_1 _nef_item.example_2 _nef_item.description The loop_category defines which loop the item belongs to (if empty it belongs directly inside the saveframe) """ def __init__(self, inputText, skipExamples=True, additionalBlocks=(), logger=None): self.specification = GenericStarParser.parse(inputText) self.additionalBlocks = additionalBlocks self.keyTags = {} self.result = None self.skipExamples = skipExamples self._category2SaveFrame = {} if logger and not callable(logger): raise TypeError('logger must be callable') self._logFunc = logger if logger else print def _logging(self, *args): """Log messages as required """ if not self._logFunc: return try: self._logFunc('{}{}'.format(INFOPREFIX, ' '.join([str(arg) for arg in args]))) except Exception as es: self._logFunc('{}>>> Error during logging: {}'.format(INFOPREFIX, str(es)))
[docs] def convertToNef(self): """Convert RCSB .cif file into a nef specification summary file """ # NOTE - this assumes a single datablock. # set up rcsbDataBlock = list(self.specification.values())[0] result = self.result = StarIo.NmrDataBlock(name='specification') # make specific content saveframes self.extractGeneralDataFrame(rcsbDataBlock) dataBlocks = [rcsbDataBlock] + list(self.specification.get(tag) for tag in self.additionalBlocks) if None in dataBlocks: ii = dataBlocks.index(None) raise ValueError("Specification file has no data block matching %s" % repr(self.additionalBlocks[ii - 1])) for dataBlock in dataBlocks: toSaveFrames, toLoops, toItems = extractByCategories(dataBlock) self._logging('%s SAVEFRAMES' % len(toSaveFrames)) for xx in toSaveFrames: self.extractSaveFrameDescription(xx) self._logging('saveframes are', list(self.result.keys())) self._logging('%s LOOPS' % len(toLoops)) for xx in toLoops: self.extractLoopDescription(xx) self._logging('%s ITEMS' % len(toItems)) for xx in toItems: self.extractItemDescription(xx) # error check: if self.keyTags: self._logging("Error. unused keys:") for tt in self.keyTags: self._logging(tt) self._logging() # return result
[docs] def extractGeneralDataFrame(self, rcsbDataBlock): """Extract general data saveframe """ saveFrame = self.result.newSaveFrame('nef_specification', category='nef_specification') saveFrame.addItem('version', rcsbDataBlock.get('_dictionary.version')) # VersionHistory loop transferLoop(rcsbDataBlock, saveFrame, ('_dictionary_history.version', '_dictionary_history.update', '_dictionary_history.revision')) # ItemType loop typeLoop = transferLoop(rcsbDataBlock, saveFrame, ('_item_type_list.code', '_item_type_list.primitive_code', '_item_type_list.construct', '_item_type_list.detail')) # Strip off spaces for row in typeLoop.data: detail = row['detail'].strip() if detail: ll = detail.splitlines() row['detail'] = '\n'.join(x.strip() for x in ll) + '\n' # return saveFrame
[docs] def extractSaveFrameDescription(self, inputSaveFrame): """Extract saveframe description """ expectedTags = ('_category.description', '_category.id', '_category.mandatory_code', '_category_group.id', '_category_key.name', '_category_examples.case', '_category_examples.detail',) metaCategory = 'nef_saveframe' category = inputSaveFrame['_category.id'] name = '%s_%s' % (metaCategory, category) saveFrame = self.result.newSaveFrame(name, category=metaCategory) self._category2SaveFrame[category] = saveFrame saveFrame.addItem('is_mandatory', inputSaveFrame.get('_category.mandatory_code') == 'yes') saveFrame.addItem('description', inputSaveFrame.get('_category.description')) data = inputSaveFrame.multiColumnValues(('_category_examples.detail', '_category_examples.case',)) examples = [x['_category_examples.case'] for x in data] if len(examples) == 1: if self.skipExamples: example = 'omitted' else: example = examples[0] saveFrame.addItem('example', example) elif examples: self._logging("Multiple examples for %s" % name) for dd in data: self._logging(dd['_category_examples.detail'], dd['_category_examples.case']) # Get keytags for later use keyNamesData = inputSaveFrame.multiColumnValues(('_category_key.name',)) for dd in keyNamesData: tt = list(dd.values())[0].split('.', 1) self.keyTags[(tt[0][1:], tt[1])] = name # Check for untreated tags for tag in inputSaveFrame: if tag not in expectedTags: self._logging("Unexpected item in %s:" % inputSaveFrame['_category.id'], tag, inputSaveFrame.get(tag))
[docs] def extractLoopDescription(self, inputSaveFrame): """Extract loop description """ expectedTags = ('_category.description', '_category.id', '_category.parent_category_id', '_category.mandatory_code', '_category_group.id', '_category_key.name', '_category_examples.case', '_category_examples.detail',) name = inputSaveFrame['_category.id'] parentCategory = inputSaveFrame.get('_category.parent_category_id') if parentCategory is None: self._logging("loop is missing _category.parent_category_id:", name) else: # NOTE:ED now need to search the previous categories for the container parent = self._category2SaveFrame.get(parentCategory) if parent is None: self._logging("loop is missing parent saveFrame:", name, parentCategory, list(self._category2SaveFrame.keys())) else: self._category2SaveFrame[name] = parent # get example data = inputSaveFrame.multiColumnValues(('_category_examples.detail', '_category_examples.case',)) examples = [x['_category_examples.case'] for x in data] if examples: example = 'omitted' if not self.skipExamples: if len(examples) == 1: example = examples[0] else: self._logging("Multiple examples for %s" % name) for dd in data: self._logging(dd['_category_examples.detail'], dd['_category_examples.case']) else: example = None # make loop loop = parent.get('nef_loop') if loop is None: loop = parent.newLoop('nef_loop', ('category', 'is_mandatory', 'description', 'example')) loop.newRow(dict( category=name, is_mandatory=inputSaveFrame.get('_category.mandatory_code') == 'yes', description=inputSaveFrame.get('_category.description'), example=example) ) # Get keytags for later use keyNamesData = inputSaveFrame.multiColumnValues(('_category_key.name',)) for dd in keyNamesData: tt = list(dd.values())[0].split('.', 1) if len(tt) != 2: self._logging("key lacks internal '.'", parentCategory, name, tt) self.keyTags[(tt[0][1:], tt[1])] = name # Check for untreated tags for tag in inputSaveFrame: if tag not in expectedTags: self._logging("Unexpected item in %s:" % inputSaveFrame['_category.id'], tag, inputSaveFrame.get(tag))
[docs] def extractItemDescription(self, inputSaveFrame): """Extract item description """ expectedTags = ('_item_description.description', '_item.name', '_item.mandatory_code', '_item.category_id', '_item_type.code', '_item_examples.case', '_item_examples.detail',) # get data name = inputSaveFrame.get('_item.name').split('.', 1)[1] category = inputSaveFrame.get('_item.category_id') isKey = (category, name) in self.keyTags if isKey: del self.keyTags[(category, name)] saveFrame = self._category2SaveFrame.get(category) if saveFrame is None: raise ValueError("SaveFrame named %s not found in list: %s" % (category, list(self._category2SaveFrame.keys()))) if saveFrame.name == 'nef_saveframe_' + category: # item lives in a saveframe, not a loop category = None isMandatory = inputSaveFrame.get('_item.mandatory_code') == 'yes' description = inputSaveFrame.get('_item_description.description') typeCode = inputSaveFrame.get('_item_type.code') # get examples data = inputSaveFrame.multiColumnValues(('_item_examples.detail', '_item_examples.case',)) examples = [x['_item_examples.case'] for x in data] if len(examples) > 2: self._logging("More than two examples for %s" % name) # for dd in data: # self._logging(dd['_item_examples.detail'], dd['_item_examples.case']) while len(examples) < 2: examples.append(None) # Add item to loop, making it if necessary specificationLoop = saveFrame.get('nef_item') if specificationLoop is None: specificationLoop = saveFrame.newLoop('nef_item', ('name', 'loop_category', 'type_code', 'is_mandatory', 'is_key', 'example_1', 'example_2', 'description')) specificationLoop.newRow(dict(name=name, loop_category=category, type_code=typeCode, is_mandatory=isMandatory, is_key=isKey, example_1=examples[0], example_2=examples[1], description=description))
[docs]def extractByCategories(rcsbDataBlock): """Get saveFrames describing SaveFrames, Loops, and items, respectively """ toSaveFrames = [] toLoops = [] toItems = [] for tag, saveFrame in rcsbDataBlock.items(): if tag.startswith('save_'): category = saveFrame.get('_category.id') if category is None: toItems.append(saveFrame) else: saveFrameCodeTag = '_%s.sf_framecode' % category keyNamesData = saveFrame.multiColumnValues(('_category_key.name',)) if any(x for x in keyNamesData if list(x.values())[0] == saveFrameCodeTag): toSaveFrames.append(saveFrame) else: toLoops.append(saveFrame) # return (toSaveFrames, toLoops, toItems)
[docs]def transferLoop(genericContainer, saveFrame, inputTags): """Transfer category.tag_x, ... to loop named category with tags tag_x etc. """ set1 = set() columns = [] for tag in inputTags: tt = tag.split('.') if len(tt) == 2 and tt[0][0] == '_': columns.append(tt[1]) set1.add(tt[0][1:]) else: raise ValueError("Tag %s is not of form _xyz.abc") if len(set1) == 1: category = set1.pop() data = genericContainer.multiColumnValues(inputTags) if data: loop = saveFrame.newLoop(category, columns=columns) for row in data: loop.newRow(list(row.get(tag) for tag in inputTags)) return loop else: raise ValueError("tags have more than on prefix: %s" % sorted((set1))) # return None
if __name__ == '__main__': args = sys.argv if len(args) < 2: print("Error, input file name is mandatory") else: infile = sys.argv[1] with open(infile) as fp: data = fp.read() converter = CifDicConverter(data) converter.convertToNef() print(converter.result.toString())