"""
A class for managing xplor_nih Structure calculation Run's
IN_PROGRESS
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
"Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license",
)
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
"CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
"J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y"
)
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2022-03-25 15:13:49 +0000 (Fri, March 25, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: geertenv $"
__date__ = "$Date: 2020-02-10 10:28:41 +0000 (Thu, February 10, 2022) $"
#=========================================================================================
# Start of code
#=========================================================================================
import os
import shutil
import argparse
import string
import sys
import pathlib
import re
from pynmrstar import Entry, Saveframe, Loop
from datetime import datetime
from distutils.dir_util import copy_tree
from ccpn.util.traits.CcpNmrTraits import \
Unicode, Dict, List, V3ObjectList, V3Object, Bool, CPath, Int
from ccpn.util.Logging import getLogger
from ccpn.util.Path import aPath, Path
from ccpn.AnalysisStructure.lib.runManagers.RunManagerABC import RunManagerABC
from ccpn.framework.Preferences import getPreferences, \
XPLOR_NIH_PATH, TALOS_PATH, CYANA_PATH, ARIA_PATH
from ccpn.core.lib.ContextManagers import undoBlockWithoutSideBar, notificationEchoBlocking
[docs]class XplorNihRunManager(RunManagerABC):
"""
Class that maintains xplor_nih structure calculation functionality
"""
_RUN_TYPE = 'xplor_nih'
_ENSEMBLE_FILE = 'ensemble.pdb'
_HIGHEST_ENERGY = 'highestEnergy'
_LOWEST_ENERGY = 'lowestEnergy'
_VIOLATIONS_NEF_FILE = 'violations.nef'
# xplor_nih-specific
# cleaning up and violation analysis
nefViolationPath = CPath(allow_none=True, default_value=None).tag(
info='The (relative) path of the violation file in Nef format'
)
cleanupDone = Bool(default_value=False).tag(info='flag to indicate if cleanup has been done'
)
violationDone = Bool(default_value=False).tag(info='flag to indicate if violation analysis has been done'
)
ensembleDone = Bool(default_value=False).tag(info='flag to indicate if ensemble has been generated'
)
# program-defs; these are not saved to json
# "redirect" the RunManagerABC definitions
_EXECUTABLE1 = XPLOR_NIH_PATH
_xplorPath = RunManagerABC._executable1 # just a better name
_EXECUTABLE2 = TALOS_PATH
_talosnPath = RunManagerABC._executable2 # just a better name
def __init__(self, project, **kwds):
"""
:param project: the project instance
"""
super().__init__(project=project, **kwds)
#TODO: check for minimal xplor version
if self._xplorPath is not None and self._xplorPath.exists() and not self._xplorFilesDirectory.exists():
getLogger().warning('xplor_nih Nef files directory not found at "%s"' % self._xplorFilesDirectory)
@property
def _xplorRootDirectory(self):
""":return the xplor root directory; or None if self._xplorPath is undefined
"""
if self._xplorPath is None:
return None
else:
return self._xplorPath.parent.parent
@property
def _xplorFilesDirectory(self):
""":return the xplorNef files directory; or None if self._xplorPath is undefined
"""
if self._xplorPath is None:
return None
else:
return self._xplorRootDirectory / 'eginput' / 'pasd' / 'nef'
@property
def _xplorBinDirectory(self):
""":return the xplor bin directory; or None if self._xplorPath is undefined
"""
if self._xplorPath is None:
return None
else:
return self._xplorRootDirectory / 'bin'
@property
def _ens2pdb(self):
""":return the end2pdb script; or None if self._xplorPath is undefined
"""
if self._xplorPath is None:
return None
else:
return self._xplorBinDirectory / 'ens2pdb'
@property
def _spectrumNames1(self) -> str:
""":return a string with the spectrum names corresponding to the current peakLists
Used for generating the xplor-script
"""
_spectrumNames = [pl.spectrum.name for pl in self.peakLists]
# convert to a single string of names
return ' '.join(_spectrumNames)
@property
def _spectrumNames(self) -> str:
""":return a string with the spectrum names corresponding to the current peakLists
Used for generating the xplor-script
"""
_spectrumNames = [pl.spectrum.name+'`'+str(pl.serial)+'`' for pl in self.peakLists]
# convert to a single string of names
# print(_spectrumNames)
return ' '.join(_spectrumNames)
@property
def _foldDirectory(self) -> Path:
""":return the absolute path to the 'fold' directory as a Path instance"""
return self.runPath / 'fold'
[docs] def restoreState(self, runPath=None):
"""Restore the settings from json-file in directory runPath
(defaults to the directory defined by current settings).
:param runPath: the path to the directory
"""
super().restoreState(runPath=runPath)
# patch to check if calculation was done; look for 'fold' files (or directory)
self.calculationDone = self.calculationDone or (len(list(self.runPath.glob('fold*'))) > 0)
self.processDone = self.processDone or (self.cleanupDone and self.ensembleDone)
#-------------------------------------------------------------------------------
[docs] def setupCalculation(self, useTimeStamp) -> Path:
"""This sets up the xplor_nih structure calculation;
:return The absolute path to the run directory
"""
logger = getLogger()
if self._xplorPath is None:
raise RuntimeError('Undefined xplor_nih path')
if self._talosnPath is None:
raise RuntimeError('Undefined talosN path')
# Create a new directory with a time stamp
_runPath = self.fetchDirectory()
# step 1; create the Nef input file
_nefInputPath = self.writeNefInputFile()
# step 2; copy files from xplor_nih directory to the working directory
if self._xplorFilesDirectory.exists():
copy_tree(self._xplorFilesDirectory.asString(), self.runPath.asString())
else:
logger.warning('Directory "%s" does not exists, cannot copy template files' % self._xplorFilesDirectory)
# step 3: write xplor_nih scipt
_xplorScript = self._writeXplorScript()
logger.debug('Created script "%s"' % _xplorScript)
# step 4: write the talosN script
_talosScript = self._writeTalosScript()
logger.debug('Created script "%s"' % _talosScript)
logger.info('Setup calculation directory %s; please execute in script in terminal' % self.runPath)
self.setupDone = True
return _runPath
[docs] def processCalculation(self):
"""Process the resulting Xplor-generated files;
includes cleanup, violation analysis and ensemble generation
"""
if not self.setupDone:
raise RuntimeError('Setup was not done')
if not self.calculationDone:
raise RuntimeError('Calculation was not done')
# do whatever is needed
if not self.cleanupDone:
self._cleanupXplor()
if not self.ensembleDone:
self._makeEnsemble()
#-------------------------------------------------------------------------------
# Helper code
#-------------------------------------------------------------------------------
def _cleanupXplor(self):
"""Cleans up the xplor_nih files; puts in more structured directories
"""
pass2 = self.runPath.fetchDir('pass2')
for path in self.runPath.glob('pass2_*'):
newPath = pass2 / path.name
path.rename(newPath)
pass3 = self.runPath.fetchDir('pass3')
for path in self.runPath.glob('pass3_*'):
newPath = pass3 / path.name
path.rename(newPath)
fold_ = self.runPath.fetchDir('fold')
for path in self.runPath.glob('fold_*'):
newPath = fold_ / path.name
path.rename(newPath)
xplor_log_ = self.runPath.fetchDir('xplor_log')
for path in self.runPath.glob('xplor.log'):
newPath = xplor_log_ / path.name
path.rename(newPath)
xplor_scripts_ = self.runPath.fetchDir('xplor_scripts')
for path in self.runPath.glob('*.py'):
newPath = xplor_scripts_ / path.name
path.rename(newPath)
for path in self.runPath.glob('*.sh'):
newPath = xplor_scripts_ / path.name
path.rename(newPath)
talos_ = self.runPath.fetchDir('talos_files')
for path in self.runPath.glob('*.tab'):
newPath = talos_ / path.name
path.rename(newPath)
peaks_ = self.runPath.fetchDir('spectra_pass_files')
for path in self.runPath.glob('*.peaks'):
newPath = peaks_ / path.name
path.rename(newPath)
for path in self.runPath.glob('*Assignments'):
newPath = peaks_ / path.name
path.rename(newPath)
for path in self.runPath.glob('*exceptions'):
newPath = peaks_ / path.name
path.rename(newPath)
self.cleanupDone = True
def _processStatsFile(self) -> list:
"""process the 'fold_##.sa.stats' file
:return a list of filenames for the lowest energy structures
"""
# process the 'fold_##.sa.stats' file
statsFile = self._foldDirectory / 'fold_##.sa.stats'
filenames = []
foundStart = False
with open(statsFile) as f:
for line in f.readlines():
if "energy RMSD RMSD" in line:
foundStart = True
continue
if foundStart:
lineToRecord = line.split()
if len(lineToRecord) != 4:
break
filenames.append(lineToRecord[0])
return filenames
def _makeEnsembleFromPdbFiles(self, pdbFiles:list) -> Path:
"""Use the xplor_nih script to assemble the pdbFiles into an ensemble
:param pdbFiles: a list of pdbFiles
:return the ensemble as a Path instance
"""
cwd = os.getcwd()
os.chdir(self._foldDirectory)
# print(os.getcwd())
pdbFilesNames = ' '.join([str(elem) for elem in pdbFiles])
command = f'{self._ens2pdb} {pdbFilesNames} > {self._ENSEMBLE_FILE}'
# print(command)
os.system(command)
os.chdir(cwd)
return self._foldDirectory / self._ENSEMBLE_FILE
def _makeEnsemble(self):
"""Make an ensemble of the output files
"""
if not self.cleanupDone:
raise RuntimeError('Trying to make ensemble before cleanup')
# obtain a list of filenames of the (20) lowest energy structures
filenames = self._processStatsFile()
# create the ensemble file
ensemblePath = self._makeEnsembleFromPdbFiles(filenames)
getLogger().debug(f'Created {ensemblePath}')
# copy the lowest energy structure
lowest = self._foldDirectory / filenames[0] + '.cif'
lowest2 = self._foldDirectory / 'lowestEnergyStructure.cif'
lowest.copyfile(lowest2)
# check / make subdirectories on foldDirectory to sort the ensemble members
highEnergyDir = self._foldDirectory.fetchDir(self._HIGHEST_ENERGY)
lowEnergyDir = self._foldDirectory.fetchDir(self._LOWEST_ENERGY)
for path in self._foldDirectory.glob('fold_*.sa'):
if path.name in filenames:
# This is a low-energy structure
_dir = lowEnergyDir
else:
_dir = highEnergyDir
# move the files
for _suffix in ('', '.cif', '.viols'):
_p = path + _suffix
newPath = _dir / _p.name
_p.rename(newPath)
self.ensembleDone = True
def _runViolationAnalysis(self):
"""Run Garys violation analysis routines
"""
from ccpn.AnalysisStructure.lib.runManagers.analyseXplorViolations import analyseXplorViolations
analyseXplorViolations(path = self._foldDirectory / self._LOWEST_ENERGY,
nefPath= self.runPath / self._VIOLATIONS_NEF_FILE)
self.violationDone = True
def _writeXplorScript(self, scriptPath=None) -> Path:
"""Generate (from template) and write a xplor-script
:param scriptPath: optional relative path name for script
:return The absolute path to script as a Path instance
"""
if scriptPath is not None:
# optionally define a non-default relative path of the script; i.e. just the filename
self.scriptPath = scriptPath
_scriptPath = self.runPath / self.scriptPath
with _scriptPath.open(mode='w') as fp:
fp.write(self._getXplorScript())
_scriptPath.chmod(0o755)
return _scriptPath
def _writeTalosScript(self) -> Path:
"""Generate (from template) and write a talos-script
:return The absolute path to script as a Path instance
"""
_scriptPath = self.runPath / 'runTalos.sh'
with _scriptPath.open(mode='w') as fp:
fp.write(self._getTalosScript())
_scriptPath.chmod(0o755)
return _scriptPath
def _getXplorScript(self):
""":return The xplor_nih script
"""
if self.useParallel == True:
_parallel = f'-smp {self.numberOfCores}'
else:
_parallel = ''
# print(self.parallel, self.parallelNumber, _parallel)
return f"""#!/bin/sh
# An example of performing a PASD calculation from NEF-formatted input
#
# This is an executable script for bash, dash or Bourne-shell compatible
# shells. The full procedure can be run using the command
# sh README
#
alias xplor='{self._xplorPath}'
alias talosn='{self._talosnPath}'
# Procedure
#
# specify the prefix of the input NEF filename (portion without .nef)
name={self.nefInputPath.stem}
#
# 1) Generate Talos-N torsion angle restraints from chemical shifts.
./runTalos.sh ${{name}}.nef
# This generates ${{name}}_new.nef, which should be used for the
# the PASD structure calculation.
#
#
# 2) a snippet to list spectra in the NEF file:
pyXplor <<EOF
from nefTools import *
nef = readNEF('{{name}}.nef')
print("%25s %6s" % ("Spectrum Name", "Num Peaks"))
for n in getBlockNames(nef, 'spectrum'):
print("%25s %6d" %(n, len(getBlock(nef,'spectrum',n).nef_peak.index)))
EOF
#
# Choose spectrum names to use
spectra='{self._spectrumNames}'
#
# 3) Run initMatch3d. The second argument given to the initMatch scripts is the
# name of the spectrum in the NEF file.
for spectrum in $spectra; do xplor initMatch3d.py ${{name}}.nef $spectrum; done
# initMatch3d.py should work for 3D spectra. It does not yet support 2D or
# 4D spectra- making this change is not difficult.
#
# 4) Run jointFilter - generate initial assignment likelihoods based on
# possible assignment connectivities.
xplor jointFilter.py ${{name}}.nef $spectra
# this generates *_pass2.peaks and *_pass2.shiftAssignments
#
# 5) First pass of structure calculation. Initially, assignment likelihoods
# are the jointFilter connectivity-based values. During the structure
# calculation these values are gradually switched over to being based solely
# on structure-based values.
xplor -parallel {_parallel} pass2.py ${{name}}_new.nef $spectra
#
# 6) update assignment likelihoods based on the 50 lowest energy structures
# from the previous calculation.
xplor summarize_pass2.py ${{name}}_new.nef $spectra
# this generates *_pass3.peaks and *_pass3.shiftAssignments
#
# 7) Second pass of structure calculation
xplor -parallel {_parallel} pass3.py ${{name}}_new.nef $spectra
#
# 8) generate final assignment likelihoods based on the 50 lowest energy
# pass3 structures.
xplor summarize_pass3.py ${{name}}_new.nef $spectra
#
# 9) write out a new NEF file using distance restraints from the PASD
# calculation and TalosN dihedral restraints.
xplor makeNEF.py ${{name}}_new.nef $spectra
# this creates the file out.nef, containing distance restraints from the
# PASD calculation in addition to the Talos-N dihedral restraints.
#
# 10) run structure calculation using the NEF restraints.
xplor -parallel {_parallel} fold.py -nef out.nef
# the results from the lowest energy 20 (of 100 total) structures is
# summarized in fold_##.sa.stats
# When run, the script validateFold.sh validates that the number of long-range
# assignments determined by PASD is sufficient, and that the precision of
# the structures calculated by fold.py is an acceptable (small) value.
"""
def _getTalosScript(self):
"""Get the talos script
"""
return f"""#!/bin/sh
nefFilename=$1
TALOSN={self._talosnPath}
numProcessors=1 # I found that a value > 1 causes crashes, irreproducibility
if [ ! -f "$1" ]; then
echo "usage: $0 <file.nef>"
echo " creates file_new.nef"
exit 1
fi
if [ -z "`which $TALOSN 2>/dev/null`" ]; then
echo "Error: Could not find program named $TALOSN"
exit 1
fi
#Q: H or HN for amide proteins? - doesn't seem to matter
./genTalosNInput.py $nefFilename
#talos spits messages to stderr - redirect to stdout
$TALOSN -in ${{nefFilename}}.tab -np $numProcessors 2>&1
./talosToNEF.py $nefFilename pred.tab predAll.tab
"""
XplorNihRunManager.register()