A class for managing xplor_nih Structure calculation Run's

# Start of code

import os
import shutil
import argparse
import string
import sys
import pathlib
import re

from pynmrstar import Entry, Saveframe, Loop
from datetime import datetime

from distutils.dir_util import copy_tree

from ccpn.util.traits.CcpNmrTraits import \
    Unicode, Dict, List, V3ObjectList, V3Object, Bool, CPath, Int
from ccpn.util.Logging import getLogger
from ccpn.util.Path import aPath, Path

from ccpn.AnalysisStructure.lib.runManagers.RunManagerABC import RunManagerABC
from ccpn.framework.Preferences import getPreferences, \

from ccpn.core.lib.ContextManagers import undoBlockWithoutSideBar, notificationEchoBlocking

[docs]class XplorNihRunManager(RunManagerABC): """ Class that maintains xplor_nih structure calculation functionality """ _RUN_TYPE = 'xplor_nih' _ENSEMBLE_FILE = 'ensemble.pdb' _HIGHEST_ENERGY = 'highestEnergy' _LOWEST_ENERGY = 'lowestEnergy' _VIOLATIONS_NEF_FILE = 'violations.nef' # xplor_nih-specific # cleaning up and violation analysis nefViolationPath = CPath(allow_none=True, default_value=None).tag( info='The (relative) path of the violation file in Nef format' ) cleanupDone = Bool(default_value=False).tag(info='flag to indicate if cleanup has been done' ) violationDone = Bool(default_value=False).tag(info='flag to indicate if violation analysis has been done' ) ensembleDone = Bool(default_value=False).tag(info='flag to indicate if ensemble has been generated' ) # program-defs; these are not saved to json # "redirect" the RunManagerABC definitions _EXECUTABLE1 = XPLOR_NIH_PATH _xplorPath = RunManagerABC._executable1 # just a better name _EXECUTABLE2 = TALOS_PATH _talosnPath = RunManagerABC._executable2 # just a better name def __init__(self, project, **kwds): """ :param project: the project instance """ super().__init__(project=project, **kwds) #TODO: check for minimal xplor version if self._xplorPath is not None and self._xplorPath.exists() and not self._xplorFilesDirectory.exists(): getLogger().warning('xplor_nih Nef files directory not found at "%s"' % self._xplorFilesDirectory) @property def _xplorRootDirectory(self): """:return the xplor root directory; or None if self._xplorPath is undefined """ if self._xplorPath is None: return None else: return self._xplorPath.parent.parent @property def _xplorFilesDirectory(self): """:return the xplorNef files directory; or None if self._xplorPath is undefined """ if self._xplorPath is None: return None else: return self._xplorRootDirectory / 'eginput' / 'pasd' / 'nef' @property def _xplorBinDirectory(self): """:return the xplor bin directory; or None if self._xplorPath is undefined """ if self._xplorPath is None: return None else: return self._xplorRootDirectory / 'bin' @property def _ens2pdb(self): """:return the end2pdb script; or None if self._xplorPath is undefined """ if self._xplorPath is None: return None else: return self._xplorBinDirectory / 'ens2pdb' @property def _spectrumNames1(self) -> str: """:return a string with the spectrum names corresponding to the current peakLists Used for generating the xplor-script """ _spectrumNames = [ for pl in self.peakLists] # convert to a single string of names return ' '.join(_spectrumNames) @property def _spectrumNames(self) -> str: """:return a string with the spectrum names corresponding to the current peakLists Used for generating the xplor-script """ _spectrumNames = ['`'+str(pl.serial)+'`' for pl in self.peakLists] # convert to a single string of names # print(_spectrumNames) return ' '.join(_spectrumNames) @property def _foldDirectory(self) -> Path: """:return the absolute path to the 'fold' directory as a Path instance""" return self.runPath / 'fold'
[docs] def restoreState(self, runPath=None): """Restore the settings from json-file in directory runPath (defaults to the directory defined by current settings). :param runPath: the path to the directory """ super().restoreState(runPath=runPath) # patch to check if calculation was done; look for 'fold' files (or directory) self.calculationDone = self.calculationDone or (len(list(self.runPath.glob('fold*'))) > 0) self.processDone = self.processDone or (self.cleanupDone and self.ensembleDone)
[docs] def setupCalculation(self, useTimeStamp) -> Path: """This sets up the xplor_nih structure calculation; :return The absolute path to the run directory """ logger = getLogger() if self._xplorPath is None: raise RuntimeError('Undefined xplor_nih path') if self._talosnPath is None: raise RuntimeError('Undefined talosN path') # Create a new directory with a time stamp _runPath = self.fetchDirectory() # step 1; create the Nef input file _nefInputPath = self.writeNefInputFile() # step 2; copy files from xplor_nih directory to the working directory if self._xplorFilesDirectory.exists(): copy_tree(self._xplorFilesDirectory.asString(), self.runPath.asString()) else: logger.warning('Directory "%s" does not exists, cannot copy template files' % self._xplorFilesDirectory) # step 3: write xplor_nih scipt _xplorScript = self._writeXplorScript() logger.debug('Created script "%s"' % _xplorScript) # step 4: write the talosN script _talosScript = self._writeTalosScript() logger.debug('Created script "%s"' % _talosScript)'Setup calculation directory %s; please execute in script in terminal' % self.runPath) self.setupDone = True return _runPath
[docs] def processCalculation(self): """Process the resulting Xplor-generated files; includes cleanup, violation analysis and ensemble generation """ if not self.setupDone: raise RuntimeError('Setup was not done') if not self.calculationDone: raise RuntimeError('Calculation was not done') # do whatever is needed if not self.cleanupDone: self._cleanupXplor() if not self.ensembleDone: self._makeEnsemble()
#------------------------------------------------------------------------------- # Helper code #------------------------------------------------------------------------------- def _cleanupXplor(self): """Cleans up the xplor_nih files; puts in more structured directories """ pass2 = self.runPath.fetchDir('pass2') for path in self.runPath.glob('pass2_*'): newPath = pass2 / path.rename(newPath) pass3 = self.runPath.fetchDir('pass3') for path in self.runPath.glob('pass3_*'): newPath = pass3 / path.rename(newPath) fold_ = self.runPath.fetchDir('fold') for path in self.runPath.glob('fold_*'): newPath = fold_ / path.rename(newPath) xplor_log_ = self.runPath.fetchDir('xplor_log') for path in self.runPath.glob('xplor.log'): newPath = xplor_log_ / path.rename(newPath) xplor_scripts_ = self.runPath.fetchDir('xplor_scripts') for path in self.runPath.glob('*.py'): newPath = xplor_scripts_ / path.rename(newPath) for path in self.runPath.glob('*.sh'): newPath = xplor_scripts_ / path.rename(newPath) talos_ = self.runPath.fetchDir('talos_files') for path in self.runPath.glob('*.tab'): newPath = talos_ / path.rename(newPath) peaks_ = self.runPath.fetchDir('spectra_pass_files') for path in self.runPath.glob('*.peaks'): newPath = peaks_ / path.rename(newPath) for path in self.runPath.glob('*Assignments'): newPath = peaks_ / path.rename(newPath) for path in self.runPath.glob('*exceptions'): newPath = peaks_ / path.rename(newPath) self.cleanupDone = True def _processStatsFile(self) -> list: """process the '' file :return a list of filenames for the lowest energy structures """ # process the '' file statsFile = self._foldDirectory / '' filenames = [] foundStart = False with open(statsFile) as f: for line in f.readlines(): if "energy RMSD RMSD" in line: foundStart = True continue if foundStart: lineToRecord = line.split() if len(lineToRecord) != 4: break filenames.append(lineToRecord[0]) return filenames def _makeEnsembleFromPdbFiles(self, pdbFiles:list) -> Path: """Use the xplor_nih script to assemble the pdbFiles into an ensemble :param pdbFiles: a list of pdbFiles :return the ensemble as a Path instance """ cwd = os.getcwd() os.chdir(self._foldDirectory) # print(os.getcwd()) pdbFilesNames = ' '.join([str(elem) for elem in pdbFiles]) command = f'{self._ens2pdb} {pdbFilesNames} > {self._ENSEMBLE_FILE}' # print(command) os.system(command) os.chdir(cwd) return self._foldDirectory / self._ENSEMBLE_FILE def _makeEnsemble(self): """Make an ensemble of the output files """ if not self.cleanupDone: raise RuntimeError('Trying to make ensemble before cleanup') # obtain a list of filenames of the (20) lowest energy structures filenames = self._processStatsFile() # create the ensemble file ensemblePath = self._makeEnsembleFromPdbFiles(filenames) getLogger().debug(f'Created {ensemblePath}') # copy the lowest energy structure lowest = self._foldDirectory / filenames[0] + '.cif' lowest2 = self._foldDirectory / 'lowestEnergyStructure.cif' lowest.copyfile(lowest2) # check / make subdirectories on foldDirectory to sort the ensemble members highEnergyDir = self._foldDirectory.fetchDir(self._HIGHEST_ENERGY) lowEnergyDir = self._foldDirectory.fetchDir(self._LOWEST_ENERGY) for path in self._foldDirectory.glob('fold_*.sa'): if in filenames: # This is a low-energy structure _dir = lowEnergyDir else: _dir = highEnergyDir # move the files for _suffix in ('', '.cif', '.viols'): _p = path + _suffix newPath = _dir / _p.rename(newPath) self.ensembleDone = True def _runViolationAnalysis(self): """Run Garys violation analysis routines """ from ccpn.AnalysisStructure.lib.runManagers.analyseXplorViolations import analyseXplorViolations analyseXplorViolations(path = self._foldDirectory / self._LOWEST_ENERGY, nefPath= self.runPath / self._VIOLATIONS_NEF_FILE) self.violationDone = True def _writeXplorScript(self, scriptPath=None) -> Path: """Generate (from template) and write a xplor-script :param scriptPath: optional relative path name for script :return The absolute path to script as a Path instance """ if scriptPath is not None: # optionally define a non-default relative path of the script; i.e. just the filename self.scriptPath = scriptPath _scriptPath = self.runPath / self.scriptPath with'w') as fp: fp.write(self._getXplorScript()) _scriptPath.chmod(0o755) return _scriptPath def _writeTalosScript(self) -> Path: """Generate (from template) and write a talos-script :return The absolute path to script as a Path instance """ _scriptPath = self.runPath / '' with'w') as fp: fp.write(self._getTalosScript()) _scriptPath.chmod(0o755) return _scriptPath def _getXplorScript(self): """:return The xplor_nih script """ if self.useParallel == True: _parallel = f'-smp {self.numberOfCores}' else: _parallel = '' # print(self.parallel, self.parallelNumber, _parallel) return f"""#!/bin/sh # An example of performing a PASD calculation from NEF-formatted input # # This is an executable script for bash, dash or Bourne-shell compatible # shells. The full procedure can be run using the command # sh README # alias xplor='{self._xplorPath}' alias talosn='{self._talosnPath}' # Procedure # # specify the prefix of the input NEF filename (portion without .nef) name={self.nefInputPath.stem} # # 1) Generate Talos-N torsion angle restraints from chemical shifts. ./ ${{name}}.nef # This generates ${{name}}_new.nef, which should be used for the # the PASD structure calculation. # # # 2) a snippet to list spectra in the NEF file: pyXplor <<EOF from nefTools import * nef = readNEF('{{name}}.nef') print("%25s %6s" % ("Spectrum Name", "Num Peaks")) for n in getBlockNames(nef, 'spectrum'): print("%25s %6d" %(n, len(getBlock(nef,'spectrum',n).nef_peak.index))) EOF # # Choose spectrum names to use spectra='{self._spectrumNames}' # # 3) Run initMatch3d. The second argument given to the initMatch scripts is the # name of the spectrum in the NEF file. for spectrum in $spectra; do xplor ${{name}}.nef $spectrum; done # should work for 3D spectra. It does not yet support 2D or # 4D spectra- making this change is not difficult. # # 4) Run jointFilter - generate initial assignment likelihoods based on # possible assignment connectivities. xplor ${{name}}.nef $spectra # this generates *_pass2.peaks and *_pass2.shiftAssignments # # 5) First pass of structure calculation. Initially, assignment likelihoods # are the jointFilter connectivity-based values. During the structure # calculation these values are gradually switched over to being based solely # on structure-based values. xplor -parallel {_parallel} ${{name}}_new.nef $spectra # # 6) update assignment likelihoods based on the 50 lowest energy structures # from the previous calculation. xplor ${{name}}_new.nef $spectra # this generates *_pass3.peaks and *_pass3.shiftAssignments # # 7) Second pass of structure calculation xplor -parallel {_parallel} ${{name}}_new.nef $spectra # # 8) generate final assignment likelihoods based on the 50 lowest energy # pass3 structures. xplor ${{name}}_new.nef $spectra # # 9) write out a new NEF file using distance restraints from the PASD # calculation and TalosN dihedral restraints. xplor ${{name}}_new.nef $spectra # this creates the file out.nef, containing distance restraints from the # PASD calculation in addition to the Talos-N dihedral restraints. # # 10) run structure calculation using the NEF restraints. xplor -parallel {_parallel} -nef out.nef # the results from the lowest energy 20 (of 100 total) structures is # summarized in # When run, the script validates that the number of long-range # assignments determined by PASD is sufficient, and that the precision of # the structures calculated by is an acceptable (small) value. """ def _getTalosScript(self): """Get the talos script """ return f"""#!/bin/sh nefFilename=$1 TALOSN={self._talosnPath} numProcessors=1 # I found that a value > 1 causes crashes, irreproducibility if [ ! -f "$1" ]; then echo "usage: $0 <file.nef>" echo " creates file_new.nef" exit 1 fi if [ -z "`which $TALOSN 2>/dev/null`" ]; then echo "Error: Could not find program named $TALOSN" exit 1 fi #Q: H or HN for amide proteins? - doesn't seem to matter ./ $nefFilename #talos spits messages to stderr - redirect to stdout $TALOSN -in ${{nefFilename}}.tab -np $numProcessors 2>&1 ./ $nefFilename """