Source code for ccpn.util.nef.NefImporter

"""
NefImporter - a series of routines for reading a Nef file and examining the contents.

Module Contents
===============

Introduction
Error handling
Examples
Nef File Contents


Introduction
------------

NefImporter consists of two classes: NefImporter - a class for handling the top-level object, and
NefDict for handling individual saveFrames in the dictionary.

NefImporter contains:

  initialise          initialise a new dictionary
  loadFile            read in the contents of a .nef file
  saveFile            save the dictionary to a .nef file

  getCategories       return the current categories defined in the Nef structure
  getSaveFrameNames   return the names of the saveFrames with the file
  hasSaveFrame        return True if the saveFrame exists
  getSaveFrame        return saveFrame of the given name
  addSaveFrame        add a new saveFrame to the dictionary


  get<name>           return the relevant structures of the Nef file
                      defined by the available categories, where <name> can be:

                          NmrMetaData
                          MolecularSystems
                          ChemicalShiftLists
                          DistanceRestraintLists
                          DihedralRestraintLists
                          RdcRestraintLists
                          NmrSpectra
                          PeakRestraintLinks

                      e.g. yourImport.getChemicalShiftLists()

  add<name>           add a new saveFrame to the dictionary
                      defined by the available categories, where <name> can be:

                          ChemicalShiftList
                          DistanceRestraintList
                          DihedralRestraintList
                          RdcRestraintList
                          NmrSpectra
                          PeakLists
                          LinkageTables

                      e.g. addChemicalShiftList

  toString            convert Nef dictionary to a string that can be written to a file
  fromString          convert string to Nef dictionary

  getAttributeNames   get a list of the attributes attached to the dictionary
  getAttribute        return the value of the attribute
  hasAttribute        return True if the attribute Exists

  lastError           error code of the last operation
  lastErrorString     error string of the last operation


NefDict contains handling routines:

  getTableNames   return a list of the tables in the saveFrame
  getTable        return table from the saveFrame, it can be returned as an OrderedDict
                  or as a Pandas DataFrame
  hasTable        return true of the table exists
  setTable        set the table - currently not implemented

  getAttributeNames   get a list of the attributes attached to the saveFrame
  getAttribute        return the value of the attribute
  hasAttribute        return True if the attribute Exists

  lastError           error code of the last operation
  lastErrorString     error string of the last operation


Error Handling
--------------

Errors can be handled in three different modes:

  'silent'            errors are handled internally and can be interrogated with saveFrame.lastError
                      with no logging to the stderr

  'standard'          errors are handled internally, error messages are logged to stderr.

  'strict'            errors message are logged to stderr and errors are raised to be trapped by
                      the calling functions

  error handling mode can be set at the instantiation of the object, e.g.

    newObject = NefImporter(errorLogging='standard')


Examples
--------
Here are a few examples of using the classes:

  # load a Nef file
  test = NefImporter(errorLogging=NEF_STANDARD)
  test.loadFile('/Users/account/Projects/NefFile.nef')

  # get categories
  print (test.getCategories())

  # get saveFrame names
  names = test.getSaveFrameNames(); print(names)
  names = test.getSaveFrameNames(returnType=NEF_RETURNALL); print(names)
  names = test.getSaveFrameNames(returnType=NEF_RETURNNEF); print (names)
  names = test.getSaveFrameNames(returnType=NEF_RETURNOTHER); print (names)

  # get a particular saveFrame
  sf1 = test.getSaveFrame(names[0])

  # convert NefImporter into a string for saving
  ts = test.toString()
  test.fromString(ts)

  # getting tables from a saveFrame
  print (sf1.getTableNames())
  table = sf1.getTable('nmr_atom', asPandas=True)
  print (table)
  print (sf1.hasTable('nmr_residue'))
  print (sf1.getAttributeNames())
  print (sf1.hasAttribute('sf_framecode'))
  print (sf1.hasAttribute('nothing'))
  print (test.getSaveFrame(name='ccpn_assignment').getTable(name='nmr_residue', asPandas=True))

  # saving a file
  print ('SAVE ', test.saveFile('/Users/ejb66/PycharmProjects/Sec5Part3testing.nef'))
  print (test.lastError)

  # test meta creation of category names
  print (test.getMolecularSystems())

There are more examples i the __main__ function at the bottom of the module


Nef File Contents
-----------------

More details of the contents of Nef files can be found in GenericStarParser
The general structure of a Nef file is:

::

    DataExtent
      DataBlock
        Item
        Loop
        SaveFrame
          Item
          Loop

DataExtent, DataBlock and SaveFrame are Python OrderedDict with an additional 'name' attribute
DataBlocks and SaveFrames are entered in their container using their name as the key.

Loop is an object with a 'columns' list, a 'data' list-of-row-OrderedDict, and a name attribute
set equal to the name of the first column. A loop is entered in its container under each
column name, so that e.g. aSaveFrame['_Loopx.loopcol1'] and aSaveFrame['_Loopx.loopcol2'] both
exist and both correspond to the same loop object.

Items are entered as a string key - string value pair.
                                    the string value can be a dictionary
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals


#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (http://www.ccpn.ac.uk) 2014 - 2019"
__credits__ = ("Ed Brooksbank, Luca Mureddu, Timothy J Ragan & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2017-07-07 16:32:41 +0100 (Fri, July 07, 2017) $"
__version__ = "$Revision: 3.0.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: Ed Brooksbank $"
__date__ = "$Date: 2017-04-07 10:28:41 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================


import os
import sys
import re
import numpy as np
from collections import OrderedDict, namedtuple
from pathlib import Path


#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# this is a fix to get the import to work when running as a standalone
# when importing into your own code, it can be safely removed

[docs]def import_parents(level=1): global __package__ import sys from os import path import importlib # pathlib does all this a lot nicer, but don't think it's in python2.7 top = parent = path.dirname(path.abspath(__file__)) package = [] for t in range(level): package.insert(0, os.path.basename(top)) top = path.dirname(top) sys.path.append(str(top)) try: sys.path.remove(str(parent)) except ValueError: # already removed pass __package__ = str('.'.join(package)) importlib.import_module(__package__)
if __name__ == '__main__' and __package__ is None: import_parents(level=1) #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ from . import StarIo from . import ErrorLog as el from . import Validator from . import Specification MAJOR_VERSION = '1' MINOR_VERSION = '1' PATCH_LEVEL = '0' __nef_version__ = '.'.join((MAJOR_VERSION, MINOR_VERSION)) # __version__ = '.'.join( (__nef_version__, PATCH_LEVEL) ) from . import NEF_ROOT_PATH NEF_DEFAULT_DICT = os.path.join(NEF_ROOT_PATH, 'mmcif_nef_v1_1.dic') NEF_CATEGORIES = [('nef_nmr_meta_data', 'get_nmr_meta_data'), ('nef_molecular_system', 'get_molecular_systems'), ('nef_chemical_shift_list', 'get_chemical_shift_lists'), ('nef_distance_restraint_list', 'get_distance_restraint_lists'), ('nef_dihedral_restraint_list', 'get_dihedral_restraint_lists'), ('nef_rdc_restraint_list', 'get_rdc_restraint_lists'), ('nef_nmr_spectrum', 'get_nmr_spectra'), ('nef_peak_restraint_links', 'get_peak_restraint_links')] NEF_REQUIRED_SAVEFRAME_BY_FRAMECODE = ['nef_nmr_meta_data', 'nef_molecular_system'] NEF_REQUIRED_SAVEFRAME_BY_CATEGORY = ['nef_chemical_shift_list', ] NEF_ALL_SAVEFRAME_REQUIRED_FIELDS = ['sf_category', 'sf_framecode', ] MD_REQUIRED_FIELDS = ['sf_category', 'sf_framecode', 'format_name', 'format_version', 'program_name', 'program_version', 'creation_date', 'uuid'] MD_OPTIONAL_FIELDS = ['coordinate_file_name', ] MD_OPTIONAL_LOOPS = ['nef_related_entries', 'nef_program_script', 'nef_run_history'] MD_RE_REQUIRED_FIELDS = ['database_name', 'database_accession_code'] MD_PS_REQUIRED_FIELDS = ['program_name', ] MD_RH_REQUIRED_FIELDS = ['run_ordinal', 'program_name'] MD_RH_OPTIONAL_FIELDS = ['program_version', 'script_name', 'script'] MS_REQUIRED_FIELDS = ['sf_category', 'sf_framecode'] MS_REQUIRED_LOOPS = ['nef_sequence'] MS_OPTIONAL_LOOPS = ['nef_covalent_links'] MS_NS_REQUIRED_FIELDS = ['chain_code', 'sequence_code', 'residue_type', 'linking', 'residue_variant'] MS_CL_REQUIRED_FIELDS = ['chain_code_1', 'sequence_code_1', 'residue_type_1', 'atom_name_1', 'chain_code_2', 'sequence_code_2', 'residue_type_2', 'atom_name_2'] CSL_REQUIRED_FIELDS = ['sf_category', 'sf_framecode', 'atom_chem_shift_units'] CSL_REQUIRED_LOOPS = ['nef_chemical_shift'] CSL_CS_REQUIRED_FIELDS = ['chain_code', 'sequence_code', 'residue_type', 'atom_name', 'value'] CSL_CS_OPTIONAL_FIELDS = ['value_uncertainty', ] DRL_REQUIRED_FIELDS = ['sf_category', 'sf_framecode', 'potential_type'] DRL_REQUIRED_LOOPS = ['nef_distance_restraint'] DRL_OPTIONAL_FIELDS = ['restraint_origin', ] DRL_DR_REQUIRED_FIELDS = ['ordinal', 'restraint_id', 'chain_code_1', 'sequence_code_1', 'residue_type_1', 'atom_name_1', 'chain_code_2', 'sequence_code_2', 'residue_type_2', 'atom_name_2', 'weight'] DRL_DR_OPTIONAL_FIELDS = ['restraint_combination_id', 'target_value', 'target_value_uncertainty', 'lower_linear_limit', 'lower_limit', 'upper_limit', 'upper_linear_limit'] DIHRL_REQUIRED_FIELDS = ['sf_category', 'sf_framecode', 'potential_type'] DIHRL_REQUIRED_LOOPS = ['nef_dihedral_restraint'] DIHRL_OPTIONAL_FIELDS = ['restraint_origin', ] DIHRL_DIHR_REQUIRED_FIELDS = ['ordinal', 'restraint_id', 'restraint_combination_id', 'chain_code_1', 'sequence_code_1', 'residue_type_1', 'atom_name_1', 'chain_code_2', 'sequence_code_2', 'residue_type_2', 'atom_name_2', 'chain_code_3', 'sequence_code_3', 'residue_type_3', 'atom_name_3', 'chain_code_4', 'sequence_code_4', 'residue_type_4', 'atom_name_4', 'weight'] DIHRL_DIHR_OPTIONAL_FIELDS = ['target_value', 'target_value_uncertainty', 'lower_linear_limit', 'lower_limit', 'upper_limit', 'upper_linear_limit', 'name'] RRL_REQUIRED_FIELDS = ['sf_category', 'sf_framecode', 'potential_type'] RRL_REQUIRED_LOOPS = ['nef_rdc_restraint'] RRL_OPTIONAL_FIELDS = ['restraint_origin', 'tensor_magnitude', 'tensor_rhombicity', 'tensor_chain_code', 'tensor_sequence_code', 'tensor_residue_type', ] RRL_RR_REQUIRED_FIELDS = ['ordinal', 'restraint_id', 'chain_code_1', 'sequence_code_1', 'residue_type_1', 'atom_name_1', 'chain_code_2', 'sequence_code_2', 'residue_type_2', 'atom_name_2', 'weight'] RRL_RR_OPTIONAL_FIELDS = ['restraint_combination_id', 'target_value', 'target_value_uncertainty', 'lower_linear_limit', 'lower_limit', 'upper_limit', 'upper_linear_limit', 'scale', 'distance_dependent', ] PL_REQUIRED_FIELDS = ['sf_category', 'sf_framecode', 'num_dimensions', 'chemical_shift_list'] PL_REQUIRED_LOOPS = ['nef_spectrum_dimension', 'nef_spectrum_dimension_transfer', 'nef_peak'] PL_OPTIONAL_FIELDS = ['experiment_classification', 'experiment_type'] PL_SD_REQUIRED_FIELDS = ['dimension_id', 'axis_unit', 'axis_code'] PL_SD_OPTIONAL_FIELDS = ['spectrometer_frequency', 'spectral_width', 'value_first_point', 'folding', 'absolute_peak_positions', 'is_acquisition', ] PL_SDT_REQUIRED_FIELDS = ['dimension_1', 'dimension_2', 'transfer_type'] PL_SDT_OPTIONAL_FIELDS = ['is_indirect', ] PL_P_REQUIRED_FIELDS = ['ordinal', 'peak_id'] PL_P_REQUIRED_ALTERNATE_FIELDS = [['height', 'volume'], ] PL_P_REQUIRED_FIELDS_PATTERN = ['position_{}', 'chain_code_{}', 'sequence_code_{}', 'residue_type_{}', 'atom_name_{}', ] PL_P_OPTIONAL_ALTERNATE_FIELDS = {r'(height)' : ['{}_uncertainty', ], r'(volume)' : ['{}_uncertainty', ], r'position_([0-9]+)': ['position_uncertainty_{}', ], } PL_P_OPTIONAL_FIELDS_PATTERN = ['position_uncertainty_{}', ] PRLS_REQUIRED_FIELDS = ['sf_category', 'sf_framecode'] PRLS_REQUIRED_LOOPS = ['nef_peak_restraint_link'] PRLS_PRL_REQUIRED_FIELDS = ['nmr_spectrum_id', 'peak_id', 'restraint_list_id', 'restraint_id'] NEF_CATEGORIES_REMOVEPREFIX = {'nef_distance_restraint' : 'distance_restraint', 'nef_molecular_system' : 'molecular_system', 'nef_covalent_links' : 'covalent_links', 'nef_peak_restraint_links' : 'peak_restraint_links', 'nef_run_history' : 'run_history', 'nef_nmr_meta_data' : 'nmr_meta_data', 'nef_rdc_restraint_list' : 'rdc_restraint_list', 'nef_peak_restraint_link' : 'peak_restraint_link', 'nef_nmr_spectrum' : 'nmr_spectrum', 'nef_spectrum_dimension' : 'spectrum_dimension', 'nef_chemical_shift_list' : 'chemical_shift_list', 'nef_sequence' : 'sequence', 'nef_program_script' : 'program_script', 'nef_related_entries' : 'related_entries', 'nef_distance_restraint_list' : 'distance_restraint_list', 'nef_rdc_restraint' : 'rdc_restraint', 'nef_chemical_shift' : 'chemical_shift', 'nef_spectrum_dimension_transfer': 'spectrum_dimension_transfer', 'nef_dihedral_restraint_list' : 'dihedral_restraint_list', 'nef_peak' : 'peak', 'nef_dihedral_restraint' : 'dihedral_restraint'} NEF_CATEGORIES_INSERTPREFIX = dict((val, key) for key, val in NEF_CATEGORIES_REMOVEPREFIX.items()) NEF_RETURNALL = 'all' NEF_RETURNNEF = 'nef_' NEF_RETURNOTHER = 'other' NEF_PREFIX = 'nef_' def _tryNumber(value): if isinstance(value, str): ll = value.rsplit('`', 2) if len(ll) == 3: # name is of form abc`xyz` try: return int(ll[1]) except ValueError: pass REGEXREMOVEENDQUOTES = u'\`\d*`+?' _nameFromCategory = namedtuple('_nameFromCategory', ('framecode', 'frameName', 'subname', 'prefix', 'postfix', 'precode', 'postcode', 'category')) def _saveFrameNameFromCategory(saveFrame: StarIo.NmrSaveFrame): """Parse the saveframe name to extract pre- and post- numbering necessary for restraint and spectrum saveframe names """ category = saveFrame['sf_category'] framecode = saveFrame['sf_framecode'] # frameName = framecode[len(category) + 1:] return _getNameFromCategory(category, framecode) def _getNameFromCategory(category, framecode): # check for any occurrences of `n` in the saveframe name and keep for later reference frameName = framecode[len(category) + 1:] names = re.split(REGEXREMOVEENDQUOTES, frameName) if 0 <= len(names) > 3: raise TypeError('bad splitting of saveframe name {}'.format(framecode)) subName = re.sub(REGEXREMOVEENDQUOTES, '', frameName) matches = [mm for mm in re.finditer(REGEXREMOVEENDQUOTES, frameName)] prefix = matches[0].group() if matches and matches[0] and matches[0].span()[0] == 0 else '' preSerial = _tryNumber(prefix) postfix = matches[-1].group() if matches and matches[-1] and matches[-1].span()[1] == len(frameName) else '' postSerial = _tryNumber(postfix) return _nameFromCategory(framecode, frameName, subName, prefix, postfix, preSerial, postSerial, category)
[docs]class NefImporter(el.ErrorLog): """Object for accessing Nef data tree. The Nef data consist of a single NmrStar dataBlock (an OrderedDict), with (saveFrameName, NmrSaveFrame) key,value pairs """ # put functions in here to read the contents of the dict. # superclassed from DataBlock which is of type StarContainer def __init__(self, programName='Unknown', programVersion='Unknown', errorLogging=el.NEF_STANDARD, hidePrefix = True, ): el.ErrorLog.__init__(self, loggingMode=errorLogging) # self.name = name self.programName = programName self.programVersion = programVersion self._hidePrefix = hidePrefix self._validateNefDict = None self.loadValidateDictionary() self._validator = Validator.Validator() self._isValid = False # No data read so far self._saveFrameNames = {} self._nefDict = None # self._initialise() # initialise a basic object self._path = None @property def data(self) -> StarIo.NmrDataBlock: """Return the NmrDataBlock instance """ return self._nefDict @property def path(self) -> str: """:return the path of the last read Nef file (empty if undefined) """ return '' if self._path is None else self._path def _logFunc(self, *args): """Simple logger for CifDicConverter using _logError """ self._logError(errorString=''.join([str(arg) for arg in args]))
[docs] @el.ErrorLog(errorCode=el.NEFERROR_ERRORLOADINGFILE) def loadValidateDictionary(self, fileName=None, mode='standard'): """Load and parse a Nef dictionary file (in star format) to validate the nef file. :param fileName: path of Nef dictionary file; defaults to current definition dictionary file :param mode: """ if fileName is None: fileName = NEF_DEFAULT_DICT if not isinstance(fileName, (str, Path)): raise RuntimeError('Invalid Nef dictionary file %r' % fileName) fileName = str(fileName) # convert any Path instance, as the downstream routines may fall over _path = os.path.expanduser(fileName) _path = os.path.normpath(_path) if not os.path.isfile(_path): raise RuntimeError('Nef dictionary file "%s" not found' % fileName) with open(_path) as fp: data = fp.read() converter = Specification.CifDicConverter(data, logger=self._logFunc) converter.convertToNef() self._validateNefDict = converter.result return True
def _doValidate(self) -> bool: """Validate the current state of self._nefDict :return True if nefDict validated successfully """ result = self._validator.isValid(self._nefDict, self._validateNefDict) self._isValid = result return result @property def isValid(self) -> bool: """ Check whether the Nef object contains the required information :return True or False: """ return self._isValid @property def validErrorLog(self): """ Return the error log from checking validity :return dict: """ return self._validator._validation_errors def _namedToNefDict(self, frame): # change a saveFrame into a normal OrderedDict newItem = NefDict(inFrame=frame, errorLogging=self.loggingMode) for ky in frame.keys(): newItem[ky] = frame[ky] return newItem def _removePrefix(self, name): if self._hidePrefix: for db in NEF_CATEGORIES_REMOVEPREFIX.keys(): if name.startswith(db): name = name.replace(db, NEF_CATEGORIES_REMOVEPREFIX[db], 1) break return name def _insertPrefix(self, name): if self._hidePrefix: for db in NEF_CATEGORIES_INSERTPREFIX.keys(): if name.startswith(db): name = name.replace(db, NEF_CATEGORIES_INSERTPREFIX[db], 1) break return name @el.ErrorLog(errorCode=el.NEFERROR_BADLISTTYPE) def _getListType(self, _listType): # return a list of '_listType' from the saveFrame, # used with nefCategory routines below if self._nefDict and isinstance(self._nefDict, OrderedDict): sfList = [self._nefDict[db] for db in self._nefDict.keys() if _listType in db] sfList = [self._namedToNefDict(sf) for sf in sfList] # if there is only one item then return it, otherwise return the list if len(sfList) > 1: return sfList elif sfList: return sfList[0] return None # routines to get the Nef specific data from the dictionary
[docs] def getNmrMetaData(self): """ Return the nef_nmr_meta_data saveFrames :return list or single item: """ return self._getListType(NEF_CATEGORIES[0][0])
[docs] def getMolecularSystems(self): """ Return the nef_molecular_system saveFrames :return list or single item: """ return self._getListType(NEF_CATEGORIES[1][0])
[docs] def getChemicalShiftLists(self): """ Return the nef_chemical_shift_list saveFrames :return list or single item: """ return self._getListType(NEF_CATEGORIES[2][0])
[docs] def getDistanceRestraintLists(self): """ Return the nef_distance_restraint_list saveFrames :return list or single item: """ return self._getListType(NEF_CATEGORIES[3][0])
[docs] def getDihedralRestraintLists(self): """ Return the nef_dihedral_restraint_list saveFrames :return list or single item: """ return self._getListType(NEF_CATEGORIES[4][0])
[docs] def getRdcRestraintLists(self): """ Return the nef_rdc_restraint_list saveFrames :return list or single item: """ return self._getListType(NEF_CATEGORIES[5][0])
[docs] def getNmrSpectra(self): """ Return the nef_nmr_spectrum saveFrames :return list or single item: """ return self._getListType(NEF_CATEGORIES[6][0])
def _initialise(self): """ Initialise a new NefImporter object with a starting saveFrame """ nefNmr = 'nef_nmr_meta_data' nefMol = 'nef_molecular_system' nefChem = 'nef_chemical_shift_list_1' self._nefDict[nefNmr] = StarIo.NmrDataBlock() self._nefDict[nefNmr].update({k: '' for k in MD_REQUIRED_FIELDS}) self._nefDict[nefNmr]['sf_category'] = 'nef_nmr_meta_data' self._nefDict[nefNmr]['sf_framecode'] = 'nef_nmr_meta_data' self._nefDict[nefNmr]['format_name'] = 'Nmr_Exchange_Format' self._nefDict[nefNmr]['format_version'] = __nef_version__ self._nefDict[nefNmr]['program_name'] = self.programName self._nefDict[nefNmr]['program_version'] = self.programVersion self._nefDict[nefMol] = StarIo.NmrDataBlock() self._nefDict[nefMol].update({k: '' for k in MS_REQUIRED_FIELDS}) self._nefDict[nefMol]['sf_category'] = 'nef_molecular_system' self._nefDict[nefMol]['sf_framecode'] = 'nef_molecular_system' for l in MS_REQUIRED_LOOPS: self._nefDict['nef_molecular_system'][l] = [] self.addChemicalShiftList(nefChem, 'ppm') self._logError(errorCode=el.NEFVALID)
[docs] @el.ErrorLog(errorCode=el.NEFERROR_BADADDSAVEFRAME) def addSaveFrame(self, name, category, required_fields=None, required_loops=None): """ Add a new saveFrame to NefImporter :param name: :param category: :param required_fields: :param required_loops: """ name = self._insertPrefix(name) self._nefDict[name] = StarIo.NmrSaveFrame() if required_fields is not None: self._nefDict[name].update({k: '' for k in required_fields}) self._nefDict[name]['sf_category'] = category self._nefDict[name]['sf_framecode'] = name if required_loops is not None: for l in required_loops: self._nefDict[name][l] = [] return self._nefDict[name]
[docs] @el.ErrorLog(errorCode=el.NEFERROR_BADADDSAVEFRAME) def addChemicalShiftList(self, name, cs_units='ppm'): name = self._insertPrefix(name) category = 'nef_chemical_shift_list' self.addSaveFrame(name=name, category=category, required_fields=CSL_REQUIRED_FIELDS, required_loops=CSL_REQUIRED_LOOPS) self._nefDict[name]['atom_chem_shift_units'] = cs_units return self._nefDict[name]
[docs] @el.ErrorLog(errorCode=el.NEFERROR_BADADDSAVEFRAME) def addDistanceRestraintList(self, name, potential_type, restraint_origin=None): name = self._insertPrefix(name) category = 'nef_distance_restraint_list' self.addSaveFrame(name=name, category=category, required_fields=DRL_REQUIRED_FIELDS, required_loops=DRL_REQUIRED_LOOPS) self._nefDict[name]['potential_type'] = potential_type if restraint_origin is not None: self._nefDict[name]['restraint_origin'] = restraint_origin return self._nefDict[name]
[docs] @el.ErrorLog(errorCode=el.NEFERROR_BADADDSAVEFRAME) def addDihedralRestraintList(self, name, potential_type, restraint_origin=None): name = self._insertPrefix(name) category = 'nef_dihedral_restraint_list' self.addSaveFrame(name=name, category=category, required_fields=DIHRL_REQUIRED_FIELDS, required_loops=DIHRL_REQUIRED_LOOPS) self._nefDict[name]['potential_type'] = potential_type if restraint_origin is not None: self._nefDict[name]['restraint_origin'] = restraint_origin return self._nefDict[name]
[docs] @el.ErrorLog(errorCode=el.NEFERROR_BADADDSAVEFRAME) def addRdcRestraintList(self, name, potential_type, restraint_origin=None, tensor_magnitude=None, tensor_rhombicity=None, tensor_chain_code=None, tensor_sequence_code=None, tensor_residue_type=None): name = self._insertPrefix(name) category = 'nef_rdc_restraint_list' self.addSaveFrame(name=name, category=category, required_fields=DIHRL_REQUIRED_FIELDS, required_loops=RRL_REQUIRED_LOOPS) self._nefDict[name]['potential_type'] = potential_type if restraint_origin is not None: self._nefDict[name]['restraint_origin'] = restraint_origin if tensor_magnitude is not None: self._nefDict[name]['tensor_magnitude'] = tensor_magnitude if tensor_rhombicity is not None: self._nefDict[name]['tensor_rhombicity'] = tensor_rhombicity if tensor_chain_code is not None: self._nefDict[name]['tensor_chain_code'] = tensor_chain_code if tensor_sequence_code is not None: self._nefDict[name]['tensor_sequence_code'] = tensor_sequence_code if tensor_residue_type is not None: self._nefDict[name]['tensor_residue_type'] = tensor_residue_type return self._nefDict[name]
[docs] @el.ErrorLog(errorCode=el.NEFERROR_BADADDSAVEFRAME) def addPeakList(self, name, num_dimensions, chemical_shift_list, experiment_classification=None, experiment_type=None): name = self._insertPrefix(name) category = 'nef_nmr_spectrum' if chemical_shift_list in self: if self._nefDict[chemical_shift_list]['sf_category'] == 'nef_chemical_shift_list': self.addSaveFrame(name=name, category=category, required_fields=PL_REQUIRED_FIELDS, required_loops=PL_REQUIRED_LOOPS) self._nefDict[name]['num_dimensions'] = num_dimensions self._nefDict[name]['chemical_shift_list'] = chemical_shift_list if experiment_classification is not None: self._nefDict[name]['experiment_classification'] = experiment_classification if experiment_type is not None: self._nefDict[name]['experiment_type'] = experiment_type return self._nefDict[name] raise Exception('{} is not a nef_chemical_shift_list.'.format(chemical_shift_list)) raise Exception('{} does not exist.'.format(chemical_shift_list))
[docs] @el.ErrorLog(errorCode=el.NEFERROR_BADADDSAVEFRAME) def addLinkageTable(self): name = category = 'nef_peak_restraint_links' return self.addSaveFrame(name=name, category=category, required_fields=PRLS_REQUIRED_FIELDS, required_loops=PL_REQUIRED_LOOPS)
[docs] @el.ErrorLog(errorCode=el.NEFERROR_BADTOSTRING) def toString(self): return self._nefDict.toString()
[docs] @el.ErrorLog(errorCode=el.NEFERROR_BADFROMSTRING) def fromString(self, text, mode='standard'): # set the Nef from the contents of the string, opposite of toString dataExtent = StarIo.parseNef(text=text, mode=mode) if dataExtent: dbs = [dataExtent[db] for db in dataExtent.keys()] if dbs: self._nefDict = dbs[0] else: self._logError(errorCode=el.NEFERROR_BADFROMSTRING) self._nefDict = None
[docs] @el.ErrorLog(errorCode=el.NEFERROR_ERRORLOADINGFILE) def loadFile(self, fileName=None, mode='standard') -> StarIo.NmrDataBlock: """Load and parse Nef-file fileName :param fileName: path to a Nef-file :return a NmrDataBlock instance """ if not isinstance(fileName, (str, Path)): raise RuntimeError('Invalid Nef file %r' % fileName) fileName = str(fileName) # convert any Path instance, as the downstream routines may fall over _path = os.path.expanduser(fileName) _path = os.path.normpath(_path) if not os.path.isfile(_path): raise RuntimeError('Nef file "%s" not found' % fileName) nefDataExtent = StarIo.parseNefFile(fileName=fileName, mode=mode) _dataBlocks = list(nefDataExtent.values()) if len(_dataBlocks) > 1: raise RuntimeError('More than one datablock in a NEF file is not allowed. Using the first and discarding the rest.\n') self._nefDict = _dataBlocks[0] self._path = fileName self._doValidate() return self.data
[docs] @el.ErrorLog(errorCode=el.NEFERROR_ERRORLOADINGFILE) def loadText(self, text, mode='standard') -> StarIo.NmrDataBlock: """Load and parse Nef-formatted text :param text: Nef-formatted text :return a NmrDataBlock instance """ nefDataExtent = StarIo.parseNef(text=text, mode=mode) _dataBlocks = list(nefDataExtent.values()) if len(_dataBlocks) > 1: raise RuntimeError('More than one datablock in a NEF file is not allowed. Using the first and discarding the rest.\n') self._nefDict = _dataBlocks[0] self._path = 'loadedFromText' self._doValidate() return self.data
[docs] @el.ErrorLog(errorCode=el.NEFERROR_ERRORSAVINGFILE) def saveFile(self, fileName=None): with open(fileName, 'w') as op: op.write(self._nefDict.toString()) return True
[docs] @el.ErrorLog(errorCode=el.NEFERROR_BADCATEGORIES) def getCategories(self): # return a list of the categories available in a Nef file return tuple([self._removePrefix(nm[0]) for nm in NEF_CATEGORIES])
[docs] @el.ErrorLog(errorCode=el.NEFERROR_SAVEFRAMEDOESNOTEXIST) def getSaveFrameNames(self, returnType=NEF_RETURNALL): # return a list of the saveFrames in the file names = [db for db in self._nefDict.keys() if isinstance(self._nefDict[db], StarIo.NmrSaveFrame)] if returnType == NEF_RETURNNEF: names = [self._removePrefix(nm) for nm in names if nm and nm.startswith(NEF_PREFIX)] elif returnType == NEF_RETURNOTHER: names = [nm for nm in names if nm and not nm.startswith(NEF_PREFIX)] else: names = [self._removePrefix(nm) for nm in names] return tuple(names)
[docs] @el.ErrorLog(errorCode=el.NEFERROR_SAVEFRAMEDOESNOTEXIST) def hasSaveFrame(self, name): # return True if the saveFrame exists, else False name = self._insertPrefix(name) return name in self._nefDict
[docs] @el.ErrorLog(errorCode=el.NEFERROR_SAVEFRAMEDOESNOTEXIST) def getSaveFrame(self, name): # return the saveFrame 'name' name = self._insertPrefix(name) return NefDict(self._nefDict[name], errorLogging=self.loggingMode, hidePrefix=self._hidePrefix)
[docs] @el.ErrorLog(errorCode=el.NEFERROR_SAVEFRAMEDOESNOTEXIST) def deleteSaveFrame(self, name): # return True if the saveFrame exists (and delete), else False name = self._insertPrefix(name) if name in self._nefDict: del self._nefDict[name] return True
[docs] @el.ErrorLog(errorCode=el.NEFERROR_SAVEFRAMEDOESNOTEXIST) def renameSaveFrame(self, name, newName): # return True if the saveFrame exists, else False name = self._insertPrefix(name) if name in self._nefDict and newName not in self._nefDict: saveFrame = self._nefDict[name] _frameID = _saveFrameNameFromCategory(saveFrame) framecode, frameName, subName, prefix, postfix, preSerial, postSerial, category = _frameID newSaveFrameName = '_'.join([category, prefix + newName + postfix]) saveFrame.name = newSaveFrameName saveFrame['sf_framecode'] = newSaveFrameName data = [(k, val) for k, val in self._nefDict.items()] for ii, (k, val) in enumerate(data): if val == saveFrame: data[ii] = (newSaveFrameName, val) # should only be one break newData = OrderedDict((k, val) for k, val in data) self._nefDict.clear() self._nefDict.update(newData) if saveFrame.get('name'): saveFrame['name'] = newName
[docs] @el.ErrorLog(errorCode=el.NEFERROR_READATTRIBUTENAMES) def getAttributeNames(self): names = [self._removePrefix(db) for db in self._nefDict.keys() if not isinstance(self._nefDict[db], StarIo.NmrSaveFrame)] return tuple(names)
[docs] @el.ErrorLog(errorCode=el.NEFERROR_READATTRIBUTE) def getAttribute(self, name): name = self._insertPrefix(name) return self._nefDict[name]
[docs] @el.ErrorLog(errorCode=el.NEFERROR_READATTRIBUTE) def hasAttribute(self, name): name = self._insertPrefix(name) return name in self._nefDict
@property def hidePrefix(self): """defines the current hidePrefix state True - Nef prefixes 'nef_' are hidden False - Nef prefixes 'nef_' are not hidden prefixes are still used in the saveFrames bit not seen in general use :return: the current hidePrefix state """ return self._hidePrefix @hidePrefix.setter def hidePrefix(self, newPrefix): # set the current hidePrefix state # True - Nef prefixes 'nef_' are hidden # False - Nef prefixes 'nef_' can be seen # prefixes are still used in the saveFrames bit not seen in general use if isinstance(newPrefix, bool): self._hidePrefix = newPrefix
[docs] def getName(self, prePend=False) -> str: """Get the name as defined by the NmrDataBlock, optionally pre-pended with 'nefData_' :return the name or '' if undefined """ if self.data is None or self.data.name is None or len(self.data.name) == 0: return '' name = str(self.data.name) if not prePend else 'nefData_' + str(self.data.name) return name
def _attachReader(self, reader): """attach a reader method """ self._reader = reader def _importNef(self, project, *args, **kwds): if hasattr(self, '_reader'): return self._reader(project, *args, **kwds) def _attachVerifier(self, verifier): """attach a verify method """ self._verifier = verifier def _verifyNef(self, project, *args, **kwds): if hasattr(self, '_verifier'): return self._verifier(project, *args, **kwds) def _attachContent(self, content): """attach a content method """ self._content = content def _contentNef(self, project, *args, **kwds): if hasattr(self, '_content'): return self._content(project, *args, **kwds) def _attachClear(self, clr): """attach a clear/reset method """ self._clearNef = clr def _clearNef(self, project, *args, **kwds): if hasattr(self, '_clearNef'): return self._clearNef(project, *args, **kwds) def __str__(self): return '<%s: errorLogging=%r; path=%s>' % (self.__class__.__name__, self._loggingMode, self._path) __repr__ = __str__
[docs]class NefDict(StarIo.NmrSaveFrame, el.ErrorLog): """ An orderedDict saveFrame object for extracting information from the NefImporter """ def __init__(self, inFrame, errorLogging=el.NEF_STANDARD, hidePrefix=True): """ Initialise a NefDict orderedDict object from a given saveFrame :param inFrame: :param errorLogging: :param hidePrefix: """ StarIo.NmrSaveFrame.__init__(self, name=inFrame.name) el.ErrorLog.__init__(self, loggingMode=errorLogging) self._nefFrame = inFrame self._hidePrefix = hidePrefix def _removePrefix(self, name): """ Remove the prefix 'nef_' from the saveFrame category name :param nef_name: :return <name>: """ if self._hidePrefix: for db in NEF_CATEGORIES_REMOVEPREFIX.keys(): if name.startswith(db): name = name.replace(db, NEF_CATEGORIES_REMOVEPREFIX[db], 1) break return name def _insertPrefix(self, name): """ Insert the prefix 'nef_' into the saveFrame category name :param name: :return nef_<name>: """ if self._hidePrefix: for db in NEF_CATEGORIES_INSERTPREFIX.keys(): if name.startswith(db): name = name.replace(db, NEF_CATEGORIES_INSERTPREFIX[db], 1) break return name @el.ErrorLog(errorCode=el.NEFERROR_BADKEYS) def _namedToOrderedDict(self, frame): """ Convert a named saveFrame to a standard orderedDict :param frame: :return orderedDict: """ newItem = OrderedDict() for ky in frame.keys(): newItem[ky] = frame[ky] return newItem
[docs] @el.ErrorLog(errorCode=el.NEFERROR_BADTABLENAMES) def getTableNames(self): """ Return list of attributes in the saveFrame :return list or None: """ return tuple([self._removePrefix(db) for db in self._nefFrame.keys() if isinstance(self._nefFrame[db], StarIo.NmrLoop)])
[docs] @el.ErrorLog(errorCode=el.NEFERROR_GENERICGETTABLEERROR) def getTable(self, name=None, asPandas=False): """ Return the table 'name' from the saveFrame if it exists if asPandas is True then return as a pandas dataFrame, otherwise return a list of saveFrames :param name: :param asPandas: :return saveFrames, dataFrames or None: """ # return table 'name' if exists else None thisFrame = None if name: if name in self._nefFrame: thisFrame = self._nefFrame[name] else: # table not found name = self._insertPrefix(name) if name in self._nefFrame: thisFrame = self._nefFrame[name] else: self._logError(errorCode=el.NEFERROR_TABLEDOESNOTEXIST) return None else: tables = self.getTableNames() if tables: thisFrame = self._nefFrame[tables[0]] if asPandas: return self._convertToPandas(thisFrame) else: return [self._namedToOrderedDict(sf) for sf in thisFrame.data]
[docs] @el.ErrorLog(errorCode=el.NEFERROR_BADMULTICOLUMNVALUES) def multiColumnValues(self, column=None): """ Return tuple of orderedDict of values for columns. Will work whether columns are in a loop or single values If columns match a single loop or nothing, return the loop data. Otherwise return a tuple with a single OrderedDict. If no column matches return None If columns match more than one loop throw an error :param column: :return orderedDicts: """ return self._nefFrame.multiColumnValues(column=column)
[docs] @el.ErrorLog(errorCode=el.NEFERROR_TABLEDOESNOTEXIST) def hasTable(self, name): """ Return True if table 'name' exists in the saveFrame :param name: :return True or False: """ # return True is the table exists in the saveFrame name = self._insertPrefix(name) return name in self._nefFrame
[docs] def setTable(self, name): # add the table 'name' to the saveFrame, or replace the existing # does this need to be here or in the main class? sys.stderr.write('Not implemented yet.\n')
def _convertToPandas(self, sf): """ Convert the saveFrame to a Pandas dataFrame :return dataFrame or None on error: """ try: import pandas as pd df = pd.DataFrame(data=sf.data, columns=sf.columns) df.replace({'.': np.NAN, 'true': True, 'false': False}, inplace=True) return df except: return None
[docs] @el.ErrorLog(errorCode=el.NEFERROR_READATTRIBUTENAMES) def getAttributeNames(self): """ Return list of attributes in the saveFrame :return list or None: """ return tuple([self._removePrefix(db) for db in self._nefFrame.keys() if not isinstance(self._nefFrame[db], StarIo.NmrLoop)])
[docs] @el.ErrorLog(errorCode=el.NEFERROR_READATTRIBUTE) def getAttribute(self, name): """ Return attribute 'name' if in the saveFrame :param name: :return attribute: """ name = self._insertPrefix(name) return self._nefFrame[name]
[docs] @el.ErrorLog(errorCode=el.NEFERROR_READATTRIBUTE) def hasAttribute(self, name): """ Return True if attribute 'name' is in the saveFrame :param name: :return True or False: """ name = self._insertPrefix(name) return name in self._nefFrame
@property def hidePrefix(self): # return the current hidePrefix state # True - Nef prefixes 'nef_' are hidden # False - Nef prefixes 'nef_' can be seen # prefixes are still used in the saveFrames bit not seen in general use return self._hidePrefix @hidePrefix.setter def hidePrefix(self, newPrefix): # set the current hidePrefix state # True - Nef prefixes 'nef_' are hidden # False - Nef prefixes 'nef_' can be seen # prefixes are still used in the saveFrames bit not seen in general use if isinstance(newPrefix, bool): self._hidePrefix = newPrefix
if __name__ == '__main__': # define the NefImporter with standard logging test = NefImporter(errorLogging=el.NEF_STANDARD) testPath = os.path.join(os.getcwd(), 'NEF', 'data_1_1', 'CCPN_H1GI_clean.nef') testPathOut = os.path.join(os.getcwd(), 'NEF', 'data_1_1', 'CCPN_H1GI_clean_extended.nef') if not os.path.exists(testPath): raise RuntimeError('Error: %s not found' % testPath) # load a testFile test.loadFile(testPath) # print the categories contained in the nef print(test.getCategories()) names = test.getSaveFrameNames() validNef = NefImporter(errorLogging=el.NEF_STANDARD) # load the validation dictionary - 1.1 DOES NOT WORK WITHOUT parent_category_id infile = 'mmcif_nef.dic' # infile = 'mmcif_nef_v1_1.dic' filePath = os.path.join(os.getcwd(), 'NEF', 'specification', infile) if not os.path.exists(filePath): raise RuntimeError('Error: %s not found' % infile) test.loadValidateDictionary(filePath) validCheck = test.isValid # print out any validation errors print('~~~~~~~~~~~~~~~~~~~~~~\nValid Nef:', validCheck) if not validCheck: print('Error Nef:', test.validErrorLog) print('Error Nef:') for k, v in test.validErrorLog.items(): print(' >>>', k) for err in v: print(' >>> ', err) print('~~~~~~~~~~~~~~~~~~~~~~') # examples of printing saveFrames information print(names) names = test.getSaveFrameNames(returnType=NEF_RETURNALL) print(names) names = test.getSaveFrameNames(returnType=NEF_RETURNNEF) print(names) names = test.getSaveFrameNames(returnType=NEF_RETURNOTHER) print(names) # testing the different types of errors trapping - try to find a saveFrame called 'notFound' sf1 = None if names: sf1 = test.getSaveFrame(names[0]) sf2 = test.getSaveFrame('notFound') print(test.hasSaveFrame('notFound')) if names: print(test.hasSaveFrame(names[0])) ts = test.toString() test.fromString(ts) # testing the different errors from searching for saveFrames if sf1 is not None: # get the list of tables print(sf1.name) print(sf1.getTableNames()) # get a table as a pandas dataframe table = sf1.getTable() sf1.getTable('nmr_atom', asPandas=True) table = sf1.getTable('nmr_atom', asPandas=True) print(table) print(sf1.hasTable('notFound')) print(sf1.hasTable('nmr_residue')) print(sf1.getAttributeNames()) print(sf1.hasAttribute('sf_framecode')) print(sf1.hasAttribute('nothing')) print(sf1.getAttribute('notHere')) print(sf1.getAttribute('sf_category')) print('Testing getTable') try: print(test.getSaveFrame(name='ccpn_assignment').getTable()) except Exception as es: print('Error: %s' % str(es)) try: print(test.getSaveFrame(name='ccpn_assignment').getTable(name='nmr_residue', asPandas=True)) except Exception as es: print('Error: %s' % str(es)) try: print(test.getSaveFrame(name='ccpn_assignment').getTable(name='notFound', asPandas=True)) except Exception as es: print('Error: %s' % str(es)) # print('Testing saveFile') # print('SAVE ', test.saveFile(testPathOut)) print(test.lastError) # test meta creation of category names print(test.getNmrMetaData()) print(test.getMolecularSystems()) print(test.getDihedralRestraintLists()) print(test.getDistanceRestraintLists()) print(test.getChemicalShiftLists()) print(test.getNmrSpectra()) print(test.getPeakRestraintLinks()) print(test.getRdcRestraintLists()) if sf1: sf1.loggingMode = el.NEF_STRICT sf1.loggingMode = el.NEF_STANDARD sf1.loggingMode = el.NEF_SILENT print(sf1.loggingMode) print(sf1.hidePrefix) if test: test.loggingMode = el.NEF_STANDARD test.loggingMode = el.NEF_SILENT test.loggingMode = el.NEF_STRICT print(test.getAttributeNames()) print(test.loggingMode) print(test.hidePrefix) print('Valid Nef:', test.isValid) print('Valid Nef:', test.validErrorLog)