Source code for ccpn.core.lib.SpectrumDataSources.BrukerSpectrumDataSource

"""
This file contains the Bruker data access class
it serves as an interface between the V3 Spectrum class and the actual spectral data
Some routines rely on code from the Nmrglue package, included in the miniconda distribution

See SpectrumDataSourceABC for a description of the methods
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (http://www.ccpn.ac.uk) 2014 - 2021"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Geerten Vuister $"
__dateModified__ = "$dateModified: 2021-12-23 11:27:16 +0000 (Thu, December 23, 2021) $"
__version__ = "$Revision: 3.0.4 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: gvuister $"
__date__ = "$Date: 2020-11-20 10:28:48 +0000 (Fri, November 20, 2020) $"
#=========================================================================================
# Start of code
#=========================================================================================

import os
from typing import Sequence

from ccpn.util.Path import Path, aPath
from ccpn.util.Logging import getLogger
from ccpn.util.Common import flatten
import ccpn.core.lib.SpectrumLib as specLib
from ccpn.core.lib.SpectrumDataSources.SpectrumDataSourceABC import SpectrumDataSourceABC


[docs]class BrukerSpectrumDataSource(SpectrumDataSourceABC): """ Bruker spectral data reading Intialization can be with: - a directory with Bruker data - a directory with Bruker processed data (pdata/x) - Bruker processed data [1r, 2rr, etc] """ dataFormat = 'Bruker' isBlocked = True wordSize = 4 headerSize = 0 blockHeaderSize = 0 isFloatData = False suffixes = [None] allowDirectory = True # Can supply a Bruker top directory or pdata directory openMethod = open defaultOpenReadMode = 'rb' _processedDataFilesDict = dict([ (1, '1r 1i'.split()), (2, '2rr 2ri 2ir 2ii'.split()), (3, '3rrr 3rri 3rir 3rii 3irr 3iri 3iir 3iii'.split()), (4, '4rrrr 4rrri 4rrir 4rrii 4rirr 4riri 4riir 4riii 4irrr 4irri 4irir 4irii 4iirr 4iiri 4iiir 4iiii'.split()), (5, '5rrrrr'), (6, '6rrrrrr'), (7, '7rrrrrrr'), (8, '8rrrrrrrr'), ]) _processedDataFiles = [f for f in flatten(_processedDataFilesDict.values())] # all the possible processed data files as a list _acqusFiles = 'acqus acqu2s acqu3s acqu4s acqu5s acqu6s acqu7s acqu8s'.split() _procFiles = 'procs proc2s proc3s proc4s proc5s proc6s proc7s proc8s'.split() _PDATA = 'pdata' termDict = {'AQ' : 'acquisition time in seconds', 'AMP' : 'amplitude of pulses', 'AQ_mod' : 'acquisition mode', 'AQSEQ' : '3D acquisition order', 'AUNM' : 'name of an acquisition AU program', 'BF(1-8)' : 'basic frequency for frequency channel f(1-8)', 'BYTORDP' : 'Byte order (endianness)', 'CNST' : 'constant used in pulse programs', 'CPDPRG(1-8)': 'names of CPD programs', 'D' : 'time delays', 'DDR' : 'digital digitizer resolution', 'DE' : 'pre-scan delay', 'DECIM' : 'decimation factor of the digital filter', 'DIGMOD' : 'digitizer mode', 'DIGTYP' : 'digitizer type', 'DQDMODE' : 'sign of the frequency shift during digital quadrature detection', 'DR' : 'digitizer resolution', 'DS' : 'number of dummy scans', 'DSLIST' : 'dataset list', 'DSPFIRM' : 'firmware used for digital filtering', 'DW' : 'dwell time', 'DWOV' : 'oversampling dwell time', 'EXP' : 'experiment performed', 'FCUCHAN' : 'routing between logical frequency channels and FCU s', 'FnMODE' : 'Acquisition mode of the indirect dimensions (2D and 3D)', 'FW' : 'analog filter width', 'FIDRES' : 'FID resolution', 'FQ1LIST' : 'irradiation frequency lists', 'GP031' : 'gradient parameter table', 'GRDPROG' : 'gradient program name', 'HDDUTY' : 'homodecoupling duty cycle (in percent)', 'HPMOD' : 'routing between high power amplifiers and preamplifier modules', 'HPPRGN' : 'high power preamplifier gain', 'INP' : 'increment for pulse P', 'IN' : 'increment for delay D', 'L' : 'loop counter', 'LOCNUC' : 'lock nucleus', 'MASR' : 'MAS spin rate', 'NBL' : 'number of blocks (of acquisition memory)', 'ND0' : 'number of delays D0', 'ND10' : 'number of delays D10', 'NS' : 'number of scans', 'NUC(1-8)' : 'nucleus for frequency channel f(1-8)', 'O(1-8)' : 'irradiation frequency offset for frequency channel f(1-8) in Hz', 'O(1-8)P' : 'irradiation frequency offset for frequency channel f(1-8) in ppm', 'OVERFLW' : 'data overflow check', 'P' : 'pulse length', 'PARMODE' : 'dimensionality of the raw data', 'PHCOR' : 'correction angle for phase programs', 'PCPD' : 'CPD pulse length', 'PH_ref' : 'receiver phase correction', 'PL' : 'power level', 'POWMOD' : 'power mode', 'PRECHAN' : 'routing between Switchbox outputs and Preamplifier modules', 'PROSOL' : 'copy prosol parameters to corresponding acquisition parameters', 'PRGAIN' : 'high power preamplifier gain', 'PULPROG' : 'pulse program used for the acquisition', 'QNP' : 'QNP nucleus selection', 'RECCHAN' : 'receiver channel', 'RG' : 'receiver gain', 'RO' : 'sample rotation frequency in Hz', 'RSEL' : 'routing between FCU s and amplifiers', 'SEOUT' : 'SE 451 receiver unit output to be used', 'SFO(1-8)' : 'irradiation (carrier) frequencies for channel f(1-8)', 'SP07' : 'shaped pulse parameter table', 'SOLVENT' : 'the sample solvent', 'SW' : 'spectral width in ppm', 'SW_h' : 'spectral width in Hz', 'SWIBOX' : 'routing between Switchbox inputs and Switchbox outputs', 'TD' : 'time domain; number of raw data points', 'TD0' : 'loop counter for multidimensional experiments', 'TE' : 'demand temperature on the temperature unit', 'V9' : 'maximum variation of a delay', 'VALIST' : 'variable amplitude (power) list', 'VCLIST' : 'variable counter list', 'VDLIST' : 'variable delay list', 'VPLIST' : 'variable pulse list', 'VTLIST' : 'variable temperature list', 'WBST' : 'number of wobble steps', 'WBSW' : 'wobble sweep width', 'ZGOPTNS' : 'acquisition (zg) options', } def __init__(self, path=None, spectrum=None): "Init local attributes" self._pdataDir = None self._brukerRoot = None self.acqus = None self.procs = None super().__init__(path, spectrum) def _isBrukerTopDir(self, path): """Return True if path (of type Path) is a Bruker top directory with acqu* files and a pdata directory """ hasAcquFiles = len(path.globList('acqu*')) > 0 pdata = path / self._PDATA return path is not None and path.exists() and path.is_dir() and \ pdata.exists() and pdata.is_dir() and \ hasAcquFiles def _checkBrukerTopDir(self, path): """Check if path (of type Path) is a Bruker top directory with acqu* files and a pdata directory raise RuntimeError on problems """ hasAcquFiles = len(path.globList('acqu*')) > 0 pdata = path / self._PDATA if path is None or not path.exists(): errorMsg = 'Bruker top directory "%s": does not exist' % path getLogger().debug2(errorMsg) raise RuntimeError(errorMsg) if not path.is_dir(): errorMsg = 'Bruker top directory "%s": is not a directory' % path getLogger().debug2(errorMsg) raise RuntimeError(errorMsg) if not hasAcquFiles: errorMsg = 'Bruker top directory "%s": has no acqu* files' % path getLogger().debug2(errorMsg) raise RuntimeError(errorMsg) if not pdata.exists() or not pdata.is_dir(): errorMsg = 'Bruker top directory "%s": has no valid pdata directory' % path getLogger().debug2(errorMsg) raise RuntimeError(errorMsg) def _hasBinaryData(self, path=None): """Return True if path (defaults to self._pdataDir) contains binary data """ if path is None: path = self._pdataDir return len(path.globList('[1-8][r,i]*')) > 0 def _isBrukerPdataDir(self, path): """Return True if path (of type Path) is a Bruker pdata directory with proc files and binary data """ if path is None: return False hasProcFiles = len(path.globList('proc*')) > 0 hasBrukerTopdir = self._isBrukerTopDir(path.parent.parent) hasPdata = path.parent.stem == self._PDATA return path.exists() and path.is_dir() and \ hasPdata and hasBrukerTopdir and hasProcFiles def _checkBrukerPdataDir(self, path): """Check if path (of type Path) is a valid Bruker pdata directory with proc* and binary data [r/i]* files; raise RuntimeError on problems """ if path is None or not path.exists(): errorMsg = 'Bruker pdata directory "%s": does not exist' % path getLogger().debug2(errorMsg) raise RuntimeError(errorMsg) if not path.is_dir(): errorMsg = 'Bruker pdata directory "%s": is not a directory' % path getLogger().debug2(errorMsg) raise RuntimeError(errorMsg) hasProcFiles = len(path.globList('proc*')) > 0 if not hasProcFiles: errorMsg = 'Bruker pdata directory "%s": has no proc* files' % path getLogger().debug2(errorMsg) raise RuntimeError(errorMsg) if not self._hasBinaryData(): errorMsg = 'Bruker pdata directory "%s": has no valid processed data' % path getLogger().debug2(errorMsg) raise RuntimeError(errorMsg) def _getDimensionality(self): """Return dimensionality from proc files in self._pdataDir """ dimensionality = 0 for proc in self._procFiles: _p = self._pdataDir / proc if not _p.exists(): break dimensionality += 1 return dimensionality def _findFirstPdataDir(self): """Find and return first pdata subdir with valid data, starting from Bruker topDir :return path of valid pdata 'proc'-directory or None if not found """ _pdata = self._brukerRoot / self._PDATA _procDirs = [] # add numeric proc dirs only; to sort later for p in _pdata.globList('[1-9]*'): try: _procDirs.append(int(p.basename)) except: pass _procDirs.sort(key=int) for p in _procDirs: _path = _pdata / str(p) if self._isBrukerPdataDir(_path) and self._hasBinaryData(_path): return _path return None
[docs] def nameFromPath(self): """Return a name derived from _brukerRoot and pdataDir """ return '%s-%s' % (self._brukerRoot.stem, self._pdataDir.stem)
@property def parentPath(self) -> aPath: """Return an absolute path of parent directory, i.e. _BrukerRoot, as a Path instance or None when dataFile is not defined. Subclassed to accommodate the special Bruker directory structure """ return (None if self.dataFile is None else self._brukerRoot.parent)
[docs] def setPath(self, path, substituteSuffix=False): """Parse and set path, assure there is the directory with acqus and pdata dirs set the _brukerRoot and _pdata attributes to point to the relevant directories Return self or None on error """ logger = getLogger() self._pdataDir = None self._brukerRoot = None self._binaryData = None self.acqus = None # list of the acqu files, parsed as dicts self.procs = None # list of the proc files, parsed as dicts if path is None: _path = None else: _path = Path(path) if _path.is_file() and _path.stem in self._processedDataFiles: # Bruker binary processed data file self._pdataDir = _path.parent self._brukerRoot = _path.parent.parent.parent self._binaryData = _path elif _path.is_file() and _path.stem in self._procFiles: # Bruker proc file self._pdataDir = _path.basename self._brukerRoot = _path.parent.parent.parent self._binaryData = None elif self._isBrukerTopDir(_path): # Bruker top directory self._brukerRoot = _path self._pdataDir = self._findFirstPdataDir() self._binaryData = None elif _path.is_dir() and _path.stem == self._PDATA and self._isBrukerTopDir(_path.parent): # Bruker/pdata directory self._brukerRoot = _path.parent self._pdataDir = self._findFirstPdataDir() self._binaryData = None elif self._isBrukerPdataDir(_path): # Bruker pdata 'proc'-directory self._pdataDir = _path self._brukerRoot = _path.parent.parent self._binaryData = None else: logger.debug2('"%s" does not define a valid path with Bruker data' % path) return None # check the directories; From now on, raise errors when not correct self._checkBrukerTopDir(self._brukerRoot) self._checkBrukerPdataDir(self._pdataDir) dimensionality = self._getDimensionality() if dimensionality not in self._processedDataFilesDict: raise RuntimeError('Undefined Bruker dimensionality "%s"in "%s"' % (dimensionality, self._pdataDir)) if self._binaryData is None: self._binaryData = self._pdataDir / self._processedDataFilesDict[dimensionality][0] if not self._binaryData.exists(): raise RuntimeError('Unable to find Bruker binary data "%s"' % self._binaryData) return super().setPath(self._binaryData, substituteSuffix=False)
[docs] def readParameters(self): """Read the parameters from the Bruker directory Returns self """ logger = getLogger() self.setDefaultParameters() self.dimensionCount = self._getDimensionality() try: # read acqus and procs self._readAcqus() self._readProcs() except Exception as es: errTxt = 'Reading acqu/proc parameters of %s; %s' % (self._brukerRoot, es) logger.error(errTxt) raise es try: _comment = self.acqus[0].get('_comments') if _comment is not None and isinstance(_comment, list): self.comment = '\n'.join(_comment) self.isBigEndian = self.procs[0].get('BYTORDP') == 1 if 'NC_proc' in self.procs[0]: self.dataScale = pow(2, float(self.procs[0]['NC_proc'])) else: self.dataScale = 1.0 self.temperature = self.acqus[0]['TE'] # Dimensional parameters for i in range(self.dimensionCount): # collaps the acq and proc dicts into one, so we do not have to # remember where the parameter lives dimDict = dict() dimDict.update(self.acqus[i]) dimDict.update(self.procs[i]) # establish dimension type (time/frequency) if int(float(dimDict.get('FT_mod', 1))) == 0: # point/time axis self.dimensionTypes[i] = specLib.DIMENSION_TIME self.measurementTypes[i] = specLib.MEASUREMENT_TYPE_TIME else: # frequency axis self.dimensionTypes[i] = specLib.DIMENSION_FREQUENCY self.measurementTypes[i] = specLib.MEASUREMENT_TYPE_SHIFT self.pointCounts[i] = int(dimDict['SI']) if self.dimensionTypes[i] == specLib.DIMENSION_TIME: tdeff = int(dimDict['TDeff']) if tdeff > 0 and tdeff < self.pointCounts[i]: self.pointCounts[i] = tdeff self.blockSizes[i] = int(dimDict['XDIM']) if self.blockSizes[i] == 0: self.blockSizes[i] = self.pointCounts[i] else: # for 1D data blockSizes can be > numPoints, which is wrongaaaaaaaaa # (comment from orginal V2-based code) self.blockSizes[i] = min(self.blockSizes[i], self.pointCounts[i]) self.isotopeCodes[i] = dimDict.get('AXNUC') self.axisLabels[i] = dimDict.get('AXNAME') self.spectralWidthsHz[i] = float(dimDict.get('SW_p', 1.0)) # SW in Hz processed (from proc file) self.spectrometerFrequencies[i] = float(dimDict.get('SFO1', dimDict.get('SF', 1.0))) self.referenceValues[i] = float(dimDict.get('OFFSET', 0.0)) self.referencePoints[i] = float(dimDict.get('refPoint', 1.0)) # CCPN first point is defined as 1 # origNumPoints[i] = int(dimDict.get('$FTSIZE', 0)) # pointOffsets[i] = int(dimDict.get('$STSR', 0)) self.phases0[i] = float(dimDict.get('PHC0', 0.0)) self.phases1[i] = float(dimDict.get('PHC1', 0.0)) except Exception as es: errTxt = 'Parsing parameters for %s; %s' % (self._brukerRoot, es) logger.error(errTxt) raise RuntimeError(errTxt) return super().readParameters()
def _readAcqus(self): """Read the acqus files; fill the self.acqus list with a dict for every dimension """ acquFiles = self._acqusFiles[0:self.dimensionCount] self.acqus = [] for f in acquFiles: try: _params = read_jcamp(self._brukerRoot / f, encoding='Windows-1252') except UnicodeDecodeError: # Try again with utf-8 _params = read_jcamp(self._brukerRoot / f, encoding='utf-8') self.acqus.append(_params) def _readProcs(self): """Read the procs files; fill the self.procs list with a dict for every dimension """ procFiles = self._procFiles[0:self.dimensionCount] self.procs = [] for f in procFiles: try: _params = read_jcamp(self._pdataDir / f, encoding='Windows-1252') except UnicodeDecodeError: # Try again with utf-8 _params = read_jcamp(self._pdataDir / f, encoding='utf-8') self.procs.append(_params)
# Register this format BrukerSpectrumDataSource._registerFormat() import locale import io from nmrglue.fileio.bruker import parse_jcamp_line
[docs]def read_jcamp(filename:str, encoding:str = locale.getpreferredencoding()) -> dict: """ Read a Bruker JCAMP-DX file into a dictionary. Creates two special dictionary keys _coreheader and _comments Bruker parameter "$FOO" are extracted into strings, floats or lists and assigned to dic["FOO"] Parameters ---------- filename : str Filename of Bruker JCAMP-DX file. encoding : str Encoding of Bruker JCAMP-DX file. Defaults to the system default locale Returns ------- dic : dict Dictionary of parameters in file. See Also -------- write_jcamp : Write a Bruker JCAMP-DX file. Notes ----- This is not a fully functional JCAMP-DX reader, it is only intended to read Bruker acqus (and similar) files. """ dic = {"_coreheader": [], "_comments": []} # create empty dictionary with io.open(filename, 'r', encoding=encoding, errors='replace') as f: endOfFile = False lineIndex = 0 while not endOfFile: # loop until end of file is found lineIndex += 1 line = f.readline().rstrip() # read a line if line == '': # end of file found endOfFile = True continue if line[:6] == "##END=": # print("End of file") endOfFile = True continue elif line[:2] == "$$": dic["_comments"].append(line) elif line[:2] == "##" and line[2] != "$": dic["_coreheader"].append(line) elif line[:3] == "##$": try: key, value = parse_jcamp_line(line, f) dic[key] = value except: getLogger().warning("%s (line:%d): Unable to correctly parse %r" % (filename, lineIndex, line)) else: getLogger().warning("%s (line:%d): Extraneous %r" % (filename, lineIndex, line)) return dic