"""Module Documentation here
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (http://www.ccpn.ac.uk) 2014 - 2017"
__credits__ = ("Wayne Boucher, Ed Brooksbank, Rasmus H Fogh, Luca Mureddu, Timothy J Ragan & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license",
"or ccpnmodel.ccpncore.memops.Credits.CcpnLicense for licence text")
__reference__ = ("For publications, please use reference from http://www.ccpn.ac.uk/v3-software/downloads/license",
"or ccpnmodel.ccpncore.memops.Credits.CcpNmrReference")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: CCPN $"
__dateModified__ = "$dateModified: 2017-07-07 16:33:15 +0100 (Fri, July 07, 2017) $"
__version__ = "$Revision: 3.0.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: wb104 $"
__date__ = "$Date: 2017-03-24 14:31:10 +0000 (Fri, March 24, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================
import os, re, sys
from typing import Sequence
import numpy
# from ccpn.util.Common import checkIsotope
from ccpn.util.Logging import getLogger
# from memops.qtgui.MessageDialog import showError
from array import array
HEADER_SIZE = 512
HEADER_BYTE_SIZE = HEADER_SIZE*4
MAGIC_BYTES = [0x40, 0x16, 0x14, 0x7b]
NDIM_INDEX = 9
NPTS_INDEX = (99, 219, 15, 32)
COMPLEX_INDEX = (55, 56, 51, 54)
ORDER_INDEX = (24, 25, 26, 27)
SW_INDEX = (229, 100, 11, 29)
SF_INDEX = (218, 119, 10, 28)
ORIGIN_INDEX = (249, 101, 12, 30)
ISOTOPE_INDEX = (18, 16, 20, 22)
VALUE_INDEX = 199
NFILES_INDEX = 442
FILE_TYPE = 'NMRPipe'
[docs]def readParams(filePath, getFileCount=False):
#GWV: THIS IS SO UGLY: Retrurns different things depending on getFileCount!!!
dataFile = filePath
wordSize = 4
isFloatData = True
headerSize = HEADER_BYTE_SIZE
blockHeaderSize = 0
sampledValues = []
sampledSigmas = []
pulseProgram = None
dataScale = 1.0
fileObj = open(filePath, 'rb')
headData = fileObj.read(headerSize)
fileObj.close()
if len(headData) < headerSize:
msg = 'NMRPipe file %s appears to be truncated'
getLogger().warning(msg)
# showError('Error', msg % filePath)
return
floatVals = array('f')
floatVals.fromstring(headData)
if floatVals[0] != 0.0:
msg = 'NMRPipe file %s appears to be corrupted'
getLogger().warning(msg)
# showError('Error', msg % filePath)
return
# byte_order = [ 0x40, 0x16, 0x14, 0x7b ]
t = [ ord(chr(c)) for c in headData[8:12] ]
if t == MAGIC_BYTES:
isBigEndian = True
else:
t.reverse()
if t == MAGIC_BYTES:
isBigEndian = False
else:
msg = 'NMRPipe file %s appears to be corrupted'
getLogger().warning(msg)
# showError('Error', msg % filePath)
return
if isBigEndian is not (sys.byteorder == 'big'):
floatVals.byteswap()
ndim = int(floatVals[NDIM_INDEX])
if not (0 < ndim < 5):
msg = 'Can only handle NMRPipe files with between 1 and 4 dimensions'
getLogger().warning(msg)
# showError('Error', msg)
return
if getFileCount:
data = int(floatVals[NFILES_INDEX])
else:
numPoints = [0] * ndim
blockSizes = [0] * ndim
refPpms = [0.0] * ndim
refPoints = [0.0] * ndim
specWidths = [1000.0] * ndim
specFreqs = [500.0] * ndim
isotopes = [None] * ndim
# print indices
# print([int(floatVals[i])-1 for i in ORDER_INDEX])
# print([int(floatVals[i]) for i in COMPLEX_INDEX+(50,)])
for i in range(ndim):
j = int(floatVals[ORDER_INDEX[i]]) - 1
c = int(floatVals[COMPLEX_INDEX[i]])
if c == 0:
msg = 'NMRPipe data is complex in dim %d, can only cope with real data at present' % i
getLogger().warning(msg)
# showError('Error', msg % (i+1))
return
numPoints[i] = int(floatVals[NPTS_INDEX[i]])
if i == 0:
blockSizes[i] = numPoints[i]
else:
blockSizes[i] = 1
specWidths[i] = sw = floatVals[SW_INDEX[j]]
if sw == 0:
specWidths[i] = sw = 1000 # ?
specFreqs[i] = sf = floatVals[SF_INDEX[j]]
o = floatVals[ORIGIN_INDEX[j]]
refPpms[i] = (sw + o) / sf
refPoints[i] = 0
# GWV: removed isotopes parts as it actually picks up the NmrPipe labels with unintentional results, e.g.
# CA mapping to a Ca (calcium) isotope)
# Instead: the isotope codes are derived in the calling loadData functions on the basis of the spectral frequencies
#
# n = 4 * ISOTOPE_INDEX[j]
# isotope = headData[n:n+4].strip()
#
# # get rid of null termination
# m = isotope.find(0)
# if m >= 0:
# isotope = (isotope[:n])
# isotopes.append( checkIsotope(isotope.decode("utf-8")) )
#
# if isotope == 'ID': # ?
# isotopes[i] = None
# else:
# isotopes[i] = checkIsotope(isotope.decode("utf-8"))
data = (FILE_TYPE, dataFile, numPoints, blockSizes,
wordSize, isBigEndian, isFloatData,
headerSize, blockHeaderSize,
isotopes, specFreqs,
specWidths, refPoints, refPpms,
sampledValues, sampledSigmas,
pulseProgram, dataScale)
return data
def _guessFileTemplate(dataSource):
"""
##CCPNINTERNAL
Called from ccpnmodel.ccpncore.lib._ccp.nmr.Nmr.DataSource
"""
dataStore = dataSource.dataStore
if not dataStore:
return None
fullPath = dataStore.fullPath
fileCount = readParams(fullPath, getFileCount=True)
if fileCount == 1: # single file
return None
fileName = os.path.basename(fullPath)
directory = os.path.dirname(fullPath)
numDim = dataSource.numDim
if numDim < 3 or r'%' in fileName:
template = fileName
elif numDim == 3:
template = re.sub('\d\d\d', '%03d', fileName)
else: # numDim == 4
# don't understand NmrPipe templates (how many 0's are allowed) so a hack here for now
numPoints1, numPoints0 = [dataDim.numPoints for dataDim in dataSource.sortedDataDims()][-2:]
# had an example where numPoints0 < 100 but had %03d%03d
#if numPoints0 < 100:
# template = re.sub('\d\d\d\d\d', '%02d%03d', fileName)
#else:
# template = re.sub('\d\d\d\d\d\d', '%03d%03d', fileName)
for template in (re.sub('\d\d\d\d\d\d', '%03d%03d', fileName), ):
filePath = os.path.join(directory, template % (numPoints0, numPoints1))
if os.path.exists(filePath):
break
else:
template = re.sub('\d\d\d\d\d', '%02d%03d', fileName)
return os.path.join(directory, template)
def _getFileData(fullPath, numPoints, headerSize, dtype):
if len(numPoints) == 1:
n = numPoints[0]
else:
n = numPoints[0] * numPoints[1]
fp = open(fullPath, 'rb')
fp.seek(headerSize, 0)
data = numpy.fromfile(file=fp, dtype=dtype, count=n).reshape(numPoints) # data is in reverse order: y,x not x,y
fp.close()
if data.dtype != numpy.float32:
data = numpy.array(data, numpy.float32)
return data
[docs]def readData(dataSource):
dataStore = dataSource.dataStore
if not dataStore:
return None
if not hasattr(dataStore, 'template'):
dataStore.template = _guessFileTemplate(dataSource)
# TBD: for now assume that above works
fullPath = dataStore.fullPath
wordSize = dataStore.nByte
isBigEndian = dataStore.isBigEndian
isFloatData = dataStore.numberType == 'float'
headerSize = dataStore.headerSize
dtype = '%s%s%s' % (isBigEndian and '>' or '<', isFloatData and 'f' or 'i', wordSize)
numPoints = tuple(reversed([dataDim.numPoints for dataDim in dataSource.sortedDataDims()]))
numDim = dataSource.numDim
if numDim < 3:
data = _getFileData(fullPath, numPoints, headerSize, dtype)
elif numDim == 3:
yxPoints = numPoints[-2:]
data = numpy.zeros(numPoints, dtype='float32')
template = dataStore.template
for z in range(numPoints[0]):
zdata = _getFileData(template % (z+1), yxPoints, headerSize, dtype)
data[z,:,:] = zdata
elif numDim == 4:
yxPoints = numPoints[-2:]
data = numpy.zeros(numPoints, dtype='float32')
template = dataStore.template
for w in range(numPoints[0]):
for z in range(numPoints[1]):
planedata = _getFileData(template % (w+1, z+1), yxPoints, headerSize, dtype)
data[w, z, :, :] = planedata
else:
raise Exception('numDim > 4 not implemented yet')
return data
[docs]def getPlaneData(dataSource:'DataSource', position:Sequence=None, xDim:int=1, yDim:int=2):
xDim -= 1
yDim -= 1
if not hasattr(dataSource, 'data'):
dataSource.data = readData(dataSource)
numDim = dataSource.numDim
if not position:
position = numDim*[1]
dataDims = dataSource.sortedDataDims()
slices = numDim * [0]
for dim, dataDim in enumerate(dataDims):
if dim in (xDim, yDim):
numPoints = dataDim.numPoints
slices[numDim-dim-1] = slice(numPoints)
if dim == xDim:
xNumPoints = numPoints
else:
yNumPoints = numPoints
else:
slices[numDim-dim-1] = slice(position[dim]-1, position[dim])
data = dataSource.data[tuple(slices)]
if xDim > yDim:
# swap x and y
axes = numpy.arange(numDim)
axes[numDim-xDim-1] = numDim-yDim-1
axes[numDim-yDim-1] = numDim-xDim-1
data = data.transpose(axes)
data = data.reshape((yNumPoints, xNumPoints))
return data
[docs]def getSliceData(dataSource:'DataSource', position:Sequence=None, sliceDim:int=1):
sliceDim -= 1
if not hasattr(dataSource, 'data'):
dataSource.data = readData(dataSource)
numDim = dataSource.numDim
if not position:
position = numDim*[1]
dataDims = dataSource.sortedDataDims()
slices = numDim * [0]
for dim, dataDim in enumerate(dataDims):
if dim == sliceDim:
numPoints = dataDim.numPoints
slices[numDim-dim-1] = slice(numPoints)
else:
slices[numDim-dim-1] = slice(position[dim]-1, position[dim])
data = dataSource.data[tuple(slices)]
data = data.reshape((numPoints,))
return data
[docs]def getRegionData(dataSource:'DataSource', startPoint:Sequence[float], endPoint:Sequence[float]):
if not hasattr(dataSource, 'data'):
dataSource.data = readData(dataSource)
numDim = dataSource.numDim
slices = numDim * [0]
for dim in range(numDim):
slices[numDim-dim-1] = slice(startPoint[dim], endPoint[dim])
data = dataSource.data[tuple(slices)]
return data
def _makeNmrPipe2DHeader(self: 'DataSource', xDim: int = 1, yDim: int = 2):
"""Make and return a NmrPipe header for dimensions (xdim,ydim) as a float array of length HEADER_SIZE
"""
xDim -= 1
yDim -= 1
magicBytes = MAGIC_BYTES # [0x40, 0x16, 0x14, 0x7b]
if sys.byteorder != 'big':
magicBytes.reverse()
bytes = array('B') # unsigned char
for byte in [0x00]*8 + magicBytes + [0x00]*(HEADER_BYTE_SIZE-12):
bytes.append(byte)
# header data; initialise
header = array('f') # float
header.frombytes(bytes)
header[NDIM_INDEX] = 2
for n in range(2):
header[ORDER_INDEX[n]] = n + 1
dataDims = self.sortedDataDims()
dataDims2D = (dataDims[xDim], dataDims[yDim])
for n, dataDim in enumerate(dataDims2D):
dataDimRef = dataDim.getPrimaryDataDimRef() # will fail if a sampled data set
expDimRef = dataDimRef.expDimRef
header[NPTS_INDEX[n]] = dataDim.numPoints
header[COMPLEX_INDEX[n]] = 1 # real
header[SF_INDEX[n]] = expDimRef.sf
# note that spectralWidth is in ppm and NmrPipe wants it in Hz, so multiply by sf
header[SW_INDEX[n]] = dataDimRef.spectralWidth * expDimRef.sf
header[ORIGIN_INDEX[n]] = expDimRef.sf * (
dataDimRef.refValue + dataDimRef.refPoint * dataDimRef.spectralWidth / dataDim.numPoints - dataDimRef.spectralWidth)
if expDimRef.isotopeCodes:
isotopeCode = expDimRef.isotopeCodes[0][:4]
isotopeCode += (4 - len(isotopeCode)) * ' '
z = array('B') # unsigned char
# does this need swapping if not big endian??
z.fromstring(isotopeCode)
w = array('f')
w.frombytes(z)
header[ISOTOPE_INDEX[n]] = w[0]
return header