Source code for ccpn.AnalysisStructure.lib.runManagers.XplorNihRunManager

"""
A class for managing xplor_nih Structure calculation Run's

IN_PROGRESS
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license",
               )
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y"
                 )
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2022-03-25 15:13:49 +0000 (Fri, March 25, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: geertenv $"
__date__ = "$Date: 2020-02-10 10:28:41 +0000 (Thu, February 10, 2022) $"
#=========================================================================================
# Start of code
#=========================================================================================

import os
import shutil
import argparse
import string
import sys
import pathlib
import re

from pynmrstar import Entry, Saveframe, Loop
from datetime import datetime

from distutils.dir_util import copy_tree

from ccpn.util.traits.CcpNmrTraits import \
    Unicode, Dict, List, V3ObjectList, V3Object, Bool, CPath, Int
from ccpn.util.Logging import getLogger
from ccpn.util.Path import aPath, Path

from ccpn.AnalysisStructure.lib.runManagers.RunManagerABC import RunManagerABC
from ccpn.framework.Preferences import getPreferences, \
    XPLOR_NIH_PATH, TALOS_PATH, CYANA_PATH, ARIA_PATH

from ccpn.core.lib.ContextManagers import undoBlockWithoutSideBar, notificationEchoBlocking


[docs]class XplorNihRunManager(RunManagerABC): """ Class that maintains xplor_nih structure calculation functionality """ _RUN_TYPE = 'xplor_nih' _ENSEMBLE_FILE = 'ensemble.pdb' _HIGHEST_ENERGY = 'highestEnergy' _LOWEST_ENERGY = 'lowestEnergy' _VIOLATIONS_NEF_FILE = 'violations.nef' # xplor_nih-specific # cleaning up and violation analysis nefViolationPath = CPath(allow_none=True, default_value=None).tag( info='The (relative) path of the violation file in Nef format' ) cleanupDone = Bool(default_value=False).tag(info='flag to indicate if cleanup has been done' ) violationDone = Bool(default_value=False).tag(info='flag to indicate if violation analysis has been done' ) ensembleDone = Bool(default_value=False).tag(info='flag to indicate if ensemble has been generated' ) # program-defs; these are not saved to json # "redirect" the RunManagerABC definitions _EXECUTABLE1 = XPLOR_NIH_PATH _xplorPath = RunManagerABC._executable1 # just a better name _EXECUTABLE2 = TALOS_PATH _talosnPath = RunManagerABC._executable2 # just a better name def __init__(self, project, **kwds): """ :param project: the project instance """ super().__init__(project=project, **kwds) #TODO: check for minimal xplor version if self._xplorPath is not None and self._xplorPath.exists() and not self._xplorFilesDirectory.exists(): getLogger().warning('xplor_nih Nef files directory not found at "%s"' % self._xplorFilesDirectory) @property def _xplorRootDirectory(self): """:return the xplor root directory; or None if self._xplorPath is undefined """ if self._xplorPath is None: return None else: return self._xplorPath.parent.parent @property def _xplorFilesDirectory(self): """:return the xplorNef files directory; or None if self._xplorPath is undefined """ if self._xplorPath is None: return None else: return self._xplorRootDirectory / 'eginput' / 'pasd' / 'nef' @property def _xplorBinDirectory(self): """:return the xplor bin directory; or None if self._xplorPath is undefined """ if self._xplorPath is None: return None else: return self._xplorRootDirectory / 'bin' @property def _ens2pdb(self): """:return the end2pdb script; or None if self._xplorPath is undefined """ if self._xplorPath is None: return None else: return self._xplorBinDirectory / 'ens2pdb' @property def _spectrumNames1(self) -> str: """:return a string with the spectrum names corresponding to the current peakLists Used for generating the xplor-script """ _spectrumNames = [pl.spectrum.name for pl in self.peakLists] # convert to a single string of names return ' '.join(_spectrumNames) @property def _spectrumNames(self) -> str: """:return a string with the spectrum names corresponding to the current peakLists Used for generating the xplor-script """ _spectrumNames = [pl.spectrum.name+'`'+str(pl.serial)+'`' for pl in self.peakLists] # convert to a single string of names # print(_spectrumNames) return ' '.join(_spectrumNames) @property def _foldDirectory(self) -> Path: """:return the absolute path to the 'fold' directory as a Path instance""" return self.runPath / 'fold'
[docs] def restoreState(self, runPath=None): """Restore the settings from json-file in directory runPath (defaults to the directory defined by current settings). :param runPath: the path to the directory """ super().restoreState(runPath=runPath) # patch to check if calculation was done; look for 'fold' files (or directory) self.calculationDone = self.calculationDone or (len(list(self.runPath.glob('fold*'))) > 0) self.processDone = self.processDone or (self.cleanupDone and self.ensembleDone)
#-------------------------------------------------------------------------------
[docs] def setupCalculation(self, useTimeStamp) -> Path: """This sets up the xplor_nih structure calculation; :return The absolute path to the run directory """ logger = getLogger() if self._xplorPath is None: raise RuntimeError('Undefined xplor_nih path') if self._talosnPath is None: raise RuntimeError('Undefined talosN path') # Create a new directory with a time stamp _runPath = self.fetchDirectory() # step 1; create the Nef input file _nefInputPath = self.writeNefInputFile() # step 2; copy files from xplor_nih directory to the working directory if self._xplorFilesDirectory.exists(): copy_tree(self._xplorFilesDirectory.asString(), self.runPath.asString()) else: logger.warning('Directory "%s" does not exists, cannot copy template files' % self._xplorFilesDirectory) # step 3: write xplor_nih scipt _xplorScript = self._writeXplorScript() logger.debug('Created script "%s"' % _xplorScript) # step 4: write the talosN script _talosScript = self._writeTalosScript() logger.debug('Created script "%s"' % _talosScript) logger.info('Setup calculation directory %s; please execute in script in terminal' % self.runPath) self.setupDone = True return _runPath
[docs] def processCalculation(self): """Process the resulting Xplor-generated files; includes cleanup, violation analysis and ensemble generation """ if not self.setupDone: raise RuntimeError('Setup was not done') if not self.calculationDone: raise RuntimeError('Calculation was not done') # do whatever is needed if not self.cleanupDone: self._cleanupXplor() if not self.ensembleDone: self._makeEnsemble()
#------------------------------------------------------------------------------- # Helper code #------------------------------------------------------------------------------- def _cleanupXplor(self): """Cleans up the xplor_nih files; puts in more structured directories """ pass2 = self.runPath.fetchDir('pass2') for path in self.runPath.glob('pass2_*'): newPath = pass2 / path.name path.rename(newPath) pass3 = self.runPath.fetchDir('pass3') for path in self.runPath.glob('pass3_*'): newPath = pass3 / path.name path.rename(newPath) fold_ = self.runPath.fetchDir('fold') for path in self.runPath.glob('fold_*'): newPath = fold_ / path.name path.rename(newPath) xplor_log_ = self.runPath.fetchDir('xplor_log') for path in self.runPath.glob('xplor.log'): newPath = xplor_log_ / path.name path.rename(newPath) xplor_scripts_ = self.runPath.fetchDir('xplor_scripts') for path in self.runPath.glob('*.py'): newPath = xplor_scripts_ / path.name path.rename(newPath) for path in self.runPath.glob('*.sh'): newPath = xplor_scripts_ / path.name path.rename(newPath) talos_ = self.runPath.fetchDir('talos_files') for path in self.runPath.glob('*.tab'): newPath = talos_ / path.name path.rename(newPath) peaks_ = self.runPath.fetchDir('spectra_pass_files') for path in self.runPath.glob('*.peaks'): newPath = peaks_ / path.name path.rename(newPath) for path in self.runPath.glob('*Assignments'): newPath = peaks_ / path.name path.rename(newPath) for path in self.runPath.glob('*exceptions'): newPath = peaks_ / path.name path.rename(newPath) self.cleanupDone = True def _processStatsFile(self) -> list: """process the 'fold_##.sa.stats' file :return a list of filenames for the lowest energy structures """ # process the 'fold_##.sa.stats' file statsFile = self._foldDirectory / 'fold_##.sa.stats' filenames = [] foundStart = False with open(statsFile) as f: for line in f.readlines(): if "energy RMSD RMSD" in line: foundStart = True continue if foundStart: lineToRecord = line.split() if len(lineToRecord) != 4: break filenames.append(lineToRecord[0]) return filenames def _makeEnsembleFromPdbFiles(self, pdbFiles:list) -> Path: """Use the xplor_nih script to assemble the pdbFiles into an ensemble :param pdbFiles: a list of pdbFiles :return the ensemble as a Path instance """ cwd = os.getcwd() os.chdir(self._foldDirectory) # print(os.getcwd()) pdbFilesNames = ' '.join([str(elem) for elem in pdbFiles]) command = f'{self._ens2pdb} {pdbFilesNames} > {self._ENSEMBLE_FILE}' # print(command) os.system(command) os.chdir(cwd) return self._foldDirectory / self._ENSEMBLE_FILE def _makeEnsemble(self): """Make an ensemble of the output files """ if not self.cleanupDone: raise RuntimeError('Trying to make ensemble before cleanup') # obtain a list of filenames of the (20) lowest energy structures filenames = self._processStatsFile() # create the ensemble file ensemblePath = self._makeEnsembleFromPdbFiles(filenames) getLogger().debug(f'Created {ensemblePath}') # copy the lowest energy structure lowest = self._foldDirectory / filenames[0] + '.cif' lowest2 = self._foldDirectory / 'lowestEnergyStructure.cif' lowest.copyfile(lowest2) # check / make subdirectories on foldDirectory to sort the ensemble members highEnergyDir = self._foldDirectory.fetchDir(self._HIGHEST_ENERGY) lowEnergyDir = self._foldDirectory.fetchDir(self._LOWEST_ENERGY) for path in self._foldDirectory.glob('fold_*.sa'): if path.name in filenames: # This is a low-energy structure _dir = lowEnergyDir else: _dir = highEnergyDir # move the files for _suffix in ('', '.cif', '.viols'): _p = path + _suffix newPath = _dir / _p.name _p.rename(newPath) self.ensembleDone = True def _runViolationAnalysis(self): """Run Garys violation analysis routines """ from ccpn.AnalysisStructure.lib.runManagers.analyseXplorViolations import analyseXplorViolations analyseXplorViolations(path = self._foldDirectory / self._LOWEST_ENERGY, nefPath= self.runPath / self._VIOLATIONS_NEF_FILE) self.violationDone = True def _writeXplorScript(self, scriptPath=None) -> Path: """Generate (from template) and write a xplor-script :param scriptPath: optional relative path name for script :return The absolute path to script as a Path instance """ if scriptPath is not None: # optionally define a non-default relative path of the script; i.e. just the filename self.scriptPath = scriptPath _scriptPath = self.runPath / self.scriptPath with _scriptPath.open(mode='w') as fp: fp.write(self._getXplorScript()) _scriptPath.chmod(0o755) return _scriptPath def _writeTalosScript(self) -> Path: """Generate (from template) and write a talos-script :return The absolute path to script as a Path instance """ _scriptPath = self.runPath / 'runTalos.sh' with _scriptPath.open(mode='w') as fp: fp.write(self._getTalosScript()) _scriptPath.chmod(0o755) return _scriptPath def _getXplorScript(self): """:return The xplor_nih script """ if self.useParallel == True: _parallel = f'-smp {self.numberOfCores}' else: _parallel = '' # print(self.parallel, self.parallelNumber, _parallel) return f"""#!/bin/sh # An example of performing a PASD calculation from NEF-formatted input # # This is an executable script for bash, dash or Bourne-shell compatible # shells. The full procedure can be run using the command # sh README # alias xplor='{self._xplorPath}' alias talosn='{self._talosnPath}' # Procedure # # specify the prefix of the input NEF filename (portion without .nef) name={self.nefInputPath.stem} # # 1) Generate Talos-N torsion angle restraints from chemical shifts. ./runTalos.sh ${{name}}.nef # This generates ${{name}}_new.nef, which should be used for the # the PASD structure calculation. # # # 2) a snippet to list spectra in the NEF file: pyXplor <<EOF from nefTools import * nef = readNEF('{{name}}.nef') print("%25s %6s" % ("Spectrum Name", "Num Peaks")) for n in getBlockNames(nef, 'spectrum'): print("%25s %6d" %(n, len(getBlock(nef,'spectrum',n).nef_peak.index))) EOF # # Choose spectrum names to use spectra='{self._spectrumNames}' # # 3) Run initMatch3d. The second argument given to the initMatch scripts is the # name of the spectrum in the NEF file. for spectrum in $spectra; do xplor initMatch3d.py ${{name}}.nef $spectrum; done # initMatch3d.py should work for 3D spectra. It does not yet support 2D or # 4D spectra- making this change is not difficult. # # 4) Run jointFilter - generate initial assignment likelihoods based on # possible assignment connectivities. xplor jointFilter.py ${{name}}.nef $spectra # this generates *_pass2.peaks and *_pass2.shiftAssignments # # 5) First pass of structure calculation. Initially, assignment likelihoods # are the jointFilter connectivity-based values. During the structure # calculation these values are gradually switched over to being based solely # on structure-based values. xplor -parallel {_parallel} pass2.py ${{name}}_new.nef $spectra # # 6) update assignment likelihoods based on the 50 lowest energy structures # from the previous calculation. xplor summarize_pass2.py ${{name}}_new.nef $spectra # this generates *_pass3.peaks and *_pass3.shiftAssignments # # 7) Second pass of structure calculation xplor -parallel {_parallel} pass3.py ${{name}}_new.nef $spectra # # 8) generate final assignment likelihoods based on the 50 lowest energy # pass3 structures. xplor summarize_pass3.py ${{name}}_new.nef $spectra # # 9) write out a new NEF file using distance restraints from the PASD # calculation and TalosN dihedral restraints. xplor makeNEF.py ${{name}}_new.nef $spectra # this creates the file out.nef, containing distance restraints from the # PASD calculation in addition to the Talos-N dihedral restraints. # # 10) run structure calculation using the NEF restraints. xplor -parallel {_parallel} fold.py -nef out.nef # the results from the lowest energy 20 (of 100 total) structures is # summarized in fold_##.sa.stats # When run, the script validateFold.sh validates that the number of long-range # assignments determined by PASD is sufficient, and that the precision of # the structures calculated by fold.py is an acceptable (small) value. """ def _getTalosScript(self): """Get the talos script """ return f"""#!/bin/sh nefFilename=$1 TALOSN={self._talosnPath} numProcessors=1 # I found that a value > 1 causes crashes, irreproducibility if [ ! -f "$1" ]; then echo "usage: $0 <file.nef>" echo " creates file_new.nef" exit 1 fi if [ -z "`which $TALOSN 2>/dev/null`" ]; then echo "Error: Could not find program named $TALOSN" exit 1 fi #Q: H or HN for amide proteins? - doesn't seem to matter ./genTalosNInput.py $nefFilename #talos spits messages to stderr - redirect to stdout $TALOSN -in ${{nefFilename}}.tab -np $numProcessors 2>&1 ./talosToNEF.py $nefFilename pred.tab predAll.tab """
XplorNihRunManager.register()