Source code for ccpn.AnalysisStructure.lib.runManagers.analyseXplorViolations

"""
Routines to parse the violations from the xplor_nih fold*.sa.viols files
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license",
               )
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y"
                 )
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2022-03-08 18:01:34 +0000 (Tue, March 08, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: gary $"
__date__ = "$Date: 2020-02-10 10:28:41 +0000 (Thu, February 10, 2022) $"
#=========================================================================================
# Start of code
#=========================================================================================

import os, sys, re
import datetime
import string

from pynmrstar import Entry, Saveframe, Loop

from ccpn.util.Logging import getLogger
from ccpn.util.Time import timeStamp


[docs]def analyseXplorViolations(path, nefPath): """Run Garys violation analysis routines on the files (extension 'sa.viols') in path, as obtained from an xplor_nih calculation """ paths = path.globList('fold_*.sa.viols') resultDict = parseViolations(paths) generateNefFile(resultDict=resultDict, nefPath=nefPath)
[docs]def collapse_name(names, depth=2): digit_by_depth = {} for target in names: for i in range(1, depth + 1): if target[-i].isdigit(): digit_by_depth.setdefault(-i, set()).add(target[-i]) else: break name_list = list(list(names)[0]) for depth, unique_digits in digit_by_depth.items(): if len(unique_digits) > 1: name_list[depth] = '%' result = ''.join(name_list) result = collapse_right_repeated_string(result) if 'HN' in result: result = 'H' return result
[docs]def collapse_right_repeated_string(target, repeating='%'): orig_length = len(target) result = target.rstrip(repeating) if len(result) < orig_length: result = result + repeating return result
[docs]def viol_to_nef(file_handle, model, file_path): # , args): in_data = False results = {} range_split = re.compile('\.\.').split for line in file_handle: if 'NOE restraints in potential term' in line: restraint_number = None sub_restraint_id = 0 pair_number = 0 current_selections = None current_fields = None index = 0 frame_name = line.split(':')[1].split('(')[0].strip() in_data = True lines = {} results[frame_name] = lines while not line.startswith('-----'): line = next(file_handle) line = next(file_handle) if in_data and line.startswith('number of restraints'): in_data = False restraint_number = None pair_number = None index = 0 if in_data and len(line.strip()) == 0: continue if in_data and '-- OR --' in line: sub_restraint_id += 1 pair_number = 0 continue if in_data: line = line.strip() line = _remove_violated_mark_if_present(line) new_restraint_number = _get_id(line, restraint_number) pair_number = 0 if new_restraint_number != restraint_number else pair_number current_pair_number = pair_number pair_number = pair_number + 1 sub_restraint_id = 0 if new_restraint_number != restraint_number else sub_restraint_id restraint_number = new_restraint_number line = _remove_id(line) selections = _get_selections(line, 2, current_selections) current_selections = selections line = _remove_selections(line, 2) line = ' '.join(line.split('!')) fields = line.split() if not fields: fields = current_fields current_fields = fields calculated = float(fields[0]) min_bound = float(range_split(fields[1])[0]) max_bound = float(range_split(fields[1])[1]) violation = 0.0 if calculated < min_bound: violation = calculated - min_bound elif calculated > max_bound: violation = calculated - max_bound key = (index, model, new_restraint_number, sub_restraint_id, current_pair_number) comment = fields[3] restraint_list = comment.rstrip(string.digits) restraint_identifier = comment[len(restraint_list):] result = {'selection-1': selections[0], 'selection-2': selections[1], 'probability': '.', 'calc': calculated, 'min': min_bound, 'max': max_bound, 'dist': calculated, 'viol': violation, 'restraint-list': restraint_list, 'restraint-number': restraint_identifier, 'comment': fields[3], 'violation-file': file_path.parts[-1], 'structure-file': f'{file_path.stem}.cif' } lines[key] = result index += 1 # print(line.strip().split([')','('])) # while((fields:=line.strip().partition(')'))[2:] !=('','')): # line=fields[-1] # print(fields) # return # args.next_index = index return results
def _get_id(line, current_id): putative_id = line.split()[0].strip() result = current_id if not putative_id.startswith('('): result = int(putative_id) return result def _remove_violated_mark_if_present(line): result = line.strip().lstrip('*').strip() return result def _remove_id(line): return line.lstrip('0123456789').strip() def _line_active(line): fields = line.split() return fields[0] != '*' def _get_selections(line, count, current_selections): results = [] line = line.strip() for i in range(count): value, _, line = line.partition(')') results.append(value) line = line.strip() results = [selection.lstrip('(') for selection in results] selections = [] for i, result in enumerate(results): selection = [] if len(result.strip()): selections.append(selection) segid = result[:4] selection.append(segid) result = result[4:] selection.extend(result.split()) else: selections.append(current_selections[i]) return selections def _remove_selections(line, count): line = line.strip() for i in range(count): value, _, line = line.partition(')') line = line.strip() return line def _collapse_pairs(nef_entries): result = {} for entry_name, data in nef_entries.items(): pair_selections = {} unique_entry_data = {} for i, (key, entry) in enumerate(data.items()): new_key = key[1:-1] pair_selections.setdefault(new_key, []).append((entry['selection-1'], entry['selection-2'])) unique_entry_data[new_key] = entry new_entry = {} for i, (new_key, selection_data) in enumerate(pair_selections.items()): atoms_1 = set() atoms_2 = set() for selection_1, selection_2 in selection_data: atoms_1.add(selection_1[-1]) atoms_2.add(selection_2[-1]) selection_1 = selection_data[0][0] selection_2 = selection_data[0][1] nef_atom_name_1 = collapse_name(atoms_1) nef_atom_name_2 = collapse_name(atoms_2) selection_1[-1] = nef_atom_name_1 selection_2[-1] = nef_atom_name_2 unique_entry_data[new_key]['selection-1'] = selection_1 unique_entry_data[new_key]['selection-2'] = selection_2 new_entry[new_key] = unique_entry_data[new_key] result[entry_name] = new_entry # ic(result) return result
[docs]def data_as_nef(overall_result): entry = Entry.from_scratch('default') UNUSED = '.' for table_name, table_data in overall_result.items(): table_name = table_name.replace('noe-', '', 1) category = "ccpn_distance_restraint_violation_list" frame_code = f'{category}_{table_name}' frame = Saveframe.from_scratch(frame_code, category) entry.add_saveframe(frame) frame.add_tag("sf_category", "ccpn_distance_restraint_violation_list") frame.add_tag("sf_framecode", frame_code) frame.add_tag("nef_spectrum", f"nef_nmr_spectrum_{list(table_data.values())[0]['restraint-list']}") frame.add_tag("nef_restraint_list", f"{list(table_data.values())[0]['restraint-list']}") frame.add_tag("program", 'Xplor-NIH') frame.add_tag("program_version", UNUSED) frame.add_tag("protocol", 'marvin/pasd,refine') frame.add_tag("protocol_version", UNUSED) frame.add_tag("protocol_parameters", UNUSED) lp = Loop.from_scratch() tags = ('index', 'model_id', 'restraint_id', 'restraint_sub_id', 'chain_code_1', 'sequence_code_1', 'residue_name_1', 'atom_name_1', 'chain_code_2', 'sequence_code_2', 'residue_name_2', 'atom_name_2', 'weight', 'probability', 'lower_limit', 'upper_limit', 'distance', 'violation', 'violation_file', 'structure_file', 'structure_index', 'nef_peak_id', 'comment' ) lp.set_category('_ccpn_distance_restraint_violation') # category) lp.add_tag(tags) for index, (indices, line_data) in enumerate(table_data.items()): indices = list(indices) indices = [index, *indices] indices[0] += 1 indices[2] += 1 indices[3] += 1 # TODO: conversion of SEGID to chain ID maybe too crude selection_1 = line_data['selection-1'] selection_1[0] = selection_1[0].strip() selection_2 = line_data['selection-2'] selection_2[0] = selection_2[0].strip() data = [*indices, *selection_1, *selection_2, 1.0, line_data['probability'], line_data['min'], line_data['max'], # GST: this removes trailing rounding errors without loss of accuracy round(line_data['dist'], 10), round(line_data['viol'], 10), line_data['violation-file'], line_data['structure-file'], 1, line_data['restraint-number'], line_data['comment'] ] lp.add_data(data) frame.add_loop(lp) return str(entry)
[docs]def parseViolations( fileList ) -> dict: """Parse the file in fileList and return a dict with the results """ overall_result = {} filename_matcher = re.compile('fold_([0-9]+).sa.viols') for path in fileList: # path = pathlib.Path(Path.joinpath(self.runPath, 'fold' , 'lowestEnergy' , name)) try: model_number = int(filename_matcher.match(path.parts[-1]).group(1)) except: raise RuntimeError( f"Couldn't find a model number in {path.parts[-1]} using matcher {filename_matcher.pattern}") if not path.is_file(): raise RuntimeError(f'I need an input file, path was {path}') with open(path) as fh: entries = viol_to_nef(fh, model_number, path) # , args) entries = _collapse_pairs(entries) for entry_name, entry in entries.items(): overall_result.setdefault(entry_name, {}).update(entry) result = data_as_nef(overall_result) return result
[docs]def generateNefFile(resultDict, nefPath): """Write the resultDict to nefPath, inserting appropriate metaData """ dateTimeStr = timeStamp() metaData = f"""data_default save_nef_nmr_meta_data _nef_nmr_meta_data.sf_category nef_nmr_meta_data _nef_nmr_meta_data.sf_framecode nef_nmr_meta_data _nef_nmr_meta_data.format_name nmr_exchange_format _nef_nmr_meta_data.format_version 1.1 _nef_nmr_meta_data.program_name CCPNprocessXplorNIHCalculation _nef_nmr_meta_data.program_version 3.2 _nef_nmr_meta_data.creation_date {dateTimeStr} _nef_nmr_meta_data.uuid CCPNprocessXplorNIHCalculation_{dateTimeStr} save_ """ resultDict.replace('data_default', metaData).replace(' ', '') with open(nefPath, 'w') as fh: fh.write(resultDict)