"""
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
"Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See https://ccpn.ac.uk/software/licensing/")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
"CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
"J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2022-03-28 15:14:57 +0100 (Mon, March 28, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2017-04-07 10:28:41 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================
import functools
# import os
import typing
import operator
from typing import Sequence, Union, Optional, List
from collections import OrderedDict
# from time import time
from datetime import datetime
from ccpn.core._implementation.AbstractWrapperObject import AbstractWrapperObject
from ccpn.core._implementation.Updater import UPDATE_POST_PROJECT_INITIALISATION
from ccpn.core._implementation.V3CoreObjectABC import V3CoreObjectABC
from ccpn.core.lib import Pid
from ccpn.core.lib import Undo
from ccpn.core.lib.ProjectSaveHistory import getProjectSaveHistory, fetchProjectSaveHistory, newProjectSaveHistory
from ccpn.core.lib.ContextManagers import notificationBlanking, undoBlock, undoBlockWithoutSideBar, \
inactivity, logCommandManager
from ccpn.util import Logging
from ccpn.util.ExcelReader import ExcelReader
from ccpn.util.Path import aPath, Path
from ccpn.util.Logging import getLogger
from ccpn.util.decorators import logCommand
from ccpn.framework.lib.pipeline.PipelineBase import Pipeline
from ccpn.framework.PathsAndUrls import CCPN_EXTENSION
from ccpn.framework.PathsAndUrls import \
CCPN_ARCHIVES_DIRECTORY, \
CCPN_STATE_DIRECTORY, \
CCPN_DATA_DIRECTORY, \
CCPN_SPECTRA_DIRECTORY, \
CCPN_PLUGINS_DIRECTORY, \
CCPN_SCRIPTS_DIRECTORY, \
CCPN_SUB_DIRECTORIES
from ccpnmodel.ccpncore.api.ccp.nmr.Nmr import NmrProject as ApiNmrProject
from ccpnmodel.ccpncore.memops import Notifiers
from ccpnmodel.ccpncore.memops.ApiError import ApiError
from ccpnmodel.ccpncore.lib.molecule import MoleculeQuery
from ccpnmodel.ccpncore.lib.spectrum import NmrExpPrototype
from ccpnmodel.ccpncore.api.ccp.nmr.NmrExpPrototype import RefExperiment
# from ccpnmodel.ccpncore.lib import Constants
from ccpnmodel.ccpncore.lib.Io import Api as apiIo
# from ccpnmodel.ccpncore.lib.Io import Formats as ioFormats
from ccpnmodel.ccpncore.lib import ApiPath
from ccpnmodel.ccpncore.lib.Io import Fasta as fastaIo
from ccpnmodel.ccpncore.api.memops import Implementation
# TODO These should be merged with the same constants in CcpnNefIo
# (and likely those in ExportNefPopup) and moved elsewhere
CHAINS = 'chains'
CHEMICALSHIFTLISTS = 'chemicalShiftLists'
RESTRAINTTABLES = 'restraintTables'
PEAKLISTS = 'peakLists'
INTEGRALLISTS = 'integralLists'
MULTIPLETLISTS = 'multipletLists'
SAMPLES = 'samples'
SUBSTANCES = 'substances'
NMRCHAINS = 'nmrChains'
# DATASETS = 'dataSets'
STRUCTUREDATA = 'structureData'
COMPLEXES = 'complexes'
SPECTRUMGROUPS = 'spectrumGroups'
NOTES = 'notes'
PEAKCLUSTERS = 'peakClusters'
COLLECTIONS = 'collections'
[docs]class Project(AbstractWrapperObject):
""" The Project is the object that contains all data objects and serves as the hub for
navigating between them.
There are 15 top-level data objects directly within a project, of which 8 have child
objects of their own, e.g. Spectrum, Sample, Chain, NmrChain, ChemicalShiftList, DataSet
and StructureEnsemble. The child data objects are organised in a logical hierarchy; for example,
a Spectrum has PeakLists, which in turn, are made up of Peaks, whereas a Chain is made up of Residues,
which are made up of Atoms.
"""
#: Short class name, for PID.
shortClassName = 'PR'
# Attribute it necessary as subclasses must use superclass className
className = 'Project'
#: Name of plural link to instances of class
_pluralLinkName = 'projects'
#: List of child classes.
_childClasses = []
# All non-abstractWrapperClasses - filled in by
_allLinkedWrapperClasses = []
# Utility map - class shortName and longName to class.
_className2Class = {}
# 20211113:ED - added extra for searching the Collection objects as these are immutable
_classNameLower2Class = {}
_className2ClassList = []
_classNameLower2ClassList = []
# List of CCPN pre-registered api notifiers
# Format is (wrapperFuncName, parameterDict, apiClassName, apiFuncName)
#
# The function self.wrapperFuncName(**parameterDict) will be registered in the CCPN api notifier system
# api notifiers are set automatically, and are cleared by self._clearAllApiNotifiers and by self.delete()
#
# RESTRICTED. Direct access in core classes ONLY
_apiNotifiers = []
# Actions you can notify
_notifierActions = ('create', 'delete', 'rename', 'change')
# Qualified name of matching API class
_apiClassQualifiedName = ApiNmrProject._metaclass.qualifiedName()
# Top level mapping dictionaries:
# pid to object and ccpnData to object
#__slots__ = ['_pid2Obj', '_data2Obj']
# Needs to know this for restoring the GuiSpectrum Module. Could be removed after decoupling Gui and Data!
_isNew = None
#-----------------------------------------------------------------------------------------
# Attributes of the data structure (incomplete)
#-----------------------------------------------------------------------------------------
@property
def spectra(self):
"""STUB: hot-fixed later"""
return ()
@property
def peakLists(self):
"""STUB: hot-fixed later"""
return ()
@property
def peaks(self):
"""STUB: hot-fixed later"""
return ()
@property
def multipletLists(self):
"""STUB: hot-fixed later"""
return ()
@property
def integralLists(self):
"""STUB: hot-fixed later"""
return ()
@property
def spectrumViews(self):
"""STUB: hot-fixed later"""
return ()
@property
def chemicalShiftLists(self):
"""STUB: hot-fixed later"""
return None
@property
def chains(self):
"""STUB: hot-fixed later"""
return None
@property
def restraintTables(self):
"""STUB: hot-fixed later"""
return None
@property
def violationTables(self):
"""STUB: hot-fixed later"""
return None
@property
def samples(self):
"""STUB: hot-fixed later"""
return None
@property
def substances(self):
"""STUB: hot-fixed later"""
return None
@property
def nmrChains(self):
"""STUB: hot-fixed later"""
return None
@property
def structureData(self):
"""STUB: hot-fixed later"""
return None
@property
def complexes(self):
"""STUB: hot-fixed later"""
return None
@property
def spectrumGroups(self):
"""STUB: hot-fixed later"""
return None
@property
def notes(self):
"""STUB: hot-fixed later"""
return None
@property
def peakClusters(self):
"""STUB: hot-fixed later"""
return None
@property
def chemicalShifts(self):
"""Return the list of chemicalShifts in the project
"""
_shifts = []
for shiftList in self.chemicalShiftLists:
_shifts.extend(shiftList.chemicalShifts)
return _shifts
@property
def collections(self):
"""Return the list of collections in the project
"""
return self._collectionList.collections
@property
def _collectionData(self):
return self._wrappedData.collectionData
@_collectionData.setter
def _collectionData(self, value):
self._wrappedData.collectionData = value
#-----------------------------------------------------------------------------------------
# (Sub-)directories of the project
#-----------------------------------------------------------------------------------------
@property
def projectPath(self) -> Path:
"""
Convenience, as project.path (currently) does not yield a Path instance
:return: the absolute path to the project as a Path instance
"""
return aPath(self.path)
@property
def statePath(self) -> Path:
"""
:return: the absolute path to the state sub-directory of the current project
as a Path instance
"""
return self.projectPath / CCPN_STATE_DIRECTORY
@property
def pipelinePath(self) -> Path:
"""
:return: the absolute path to the state/pipeline sub-directory of
the current project as a Path instance
"""
return self.statePath.fetchDir(Pipeline.className)
@property
def dataPath(self) -> Path:
"""
:return: the absolute path to the data sub-directory of the current project
as a Path instance
"""
return self.projectPath / CCPN_DATA_DIRECTORY
@property
def spectraPath(self):
"""
:return: the absolute path to the data sub-directory of the current project
as a Path instance
"""
return self.projectPath / CCPN_SPECTRA_DIRECTORY
@property
def pluginDataPath(self) -> Path:
"""
:return: the absolute path to the data/plugins sub-directory of the
current project as a Path instance
"""
return self.projectPath / CCPN_PLUGINS_DIRECTORY
@property
def scriptsPath(self) -> Path:
"""
:return: the absolute path to the script sub-directory of the current project
as a Path instance
"""
return self.projectPath / CCPN_SCRIPTS_DIRECTORY
@property
def archivesPath(self) -> Path:
"""
:return: the absolute path to the archives sub-directory of the current project
as a Path instance
"""
return aPath(self.project.path) / CCPN_ARCHIVES_DIRECTORY
# TODO: define not using API
@property
def backupPath(self):
"""path to directory containing backup Project"""
backupRepository = self._wrappedData.parent.findFirstRepository(name="backup")
if not backupRepository:
self._logger.warning('Warning: no backup path set, so no backup done')
return
backupUrl = backupRepository.url
backupPath = backupUrl.path
return backupPath
#-----------------------------------------------------------------------------------------
# Implementation methods
#-----------------------------------------------------------------------------------------
def __init__(self, wrappedData: ApiNmrProject):
""" Special init for root (Project) object
NB Project is NOT complete before the _initProject function is run.
"""
if not isinstance(wrappedData, ApiNmrProject):
raise ValueError("Project initialised with %s, should be ccp.nmr.Nmr.NmrProject."
% wrappedData)
# Define linkage attributes
self._project = self
self._wrappedData = wrappedData
# self._appBase = None (delt with below)
# Reference to application; defined by Framework
self._application = None
# setup object handling dictionaries
self._data2Obj = {wrappedData: self}
self._pid2Obj = {}
self._id = wrappedData.name
self._resetIds()
# Set up notification machinery
# Active notifiers - saved for later cleanup. CORER APPLICAATION ONLY
self._activeNotifiers = []
# list or None. When set used to accumulate pending notifiers
# Optional list. Elements are (func, onceOnly, wrapperObject, optional oldPid)
self._pendingNotifications = []
# Notification suspension level - to allow for nested notification suspension
self._notificationSuspension = 0
# Notification blanking level - to allow for nested notification disabling
self._notificationBlanking = 0
# api 'change' notification blanking level - to allow for api 'change' call to be
# disabled in the _modifiedApiObject method.
# To be used with the apiNotificationBlanking contact manager; e.g.
# with apiNotificationBlanking():
# do something
#
self._apiNotificationBlanking = 0
# Wrapper level notifier tracking. APPLICATION ONLY
# {(className,action):OrderedDict(notifier:onceOnly)}
self._context2Notifiers = {}
# Special attributes:
self._implExperimentTypeMap = None
# reference to a ProjectSaveHistory instance; defined _newProject() or _loadProject()
self._saveHistory = None
# reference to the logger; defined in call to _initialiseProject())
self._logger = None
# reference to special v3 core lists without abstractWrapperObject
self._collectionList = None
self._checkProjectSubDirectories()
@property
def application(self):
return self._application
# GWV: 20181102: insert _appBase to retain consistency with current data loading models
_appBase = application
@property
def isNew(self):
"""Return true if the project is new
"""
# NOTE:ED - based on original check in _initProject
return self._wrappedData.root.isModified
@property
def isTemporary(self):
"""Return true if the project is temporary, i.e., not saved or updated.
"""
apiProject = self._wrappedData.root
return hasattr(apiProject, '_temporaryDirectory')
@property
def isModified(self):
"""Return true if any part of the project has been modified
"""
return self._wrappedData.root.isProjectModified()
@property
def _isUpgradedFromV2(self):
"""Return True if project was upgraded from V2
"""
return self._apiNmrProject.root._upgradedFromV2
@staticmethod
def _needsUpgrading(path) -> bool:
"""
Check if project defined by path needs upgrading
:param path: a ccpn project-path
:return: True/False
"""
from ccpn.framework.lib.DataLoaders.CcpNmrV2ProjectDataLoader import CcpNmrV2ProjectDataLoader
from ccpn.framework.lib.DataLoaders.CcpNmrV3ProjectDataLoader import CcpNmrV3ProjectDataLoader
# Check for V2 project; always needs upgrading
if (dataloader := CcpNmrV2ProjectDataLoader.checkForValidFormat(path)) is not None:
return True
if (dataloader := CcpNmrV3ProjectDataLoader.checkForValidFormat(path)) is None:
raise ValueError('Path "%s" does not define a valid ccpn project' % path)
if (projectHistory := getProjectSaveHistory(dataloader.path)):
# check whether the history exists
return projectHistory.lastSavedVersion <= '3.0.4'
return True
@property
def _data(self):
"""Get the contents of the data property from the model
CCPNInternal only
"""
return self._wrappedData.data
@_data.setter
def _data(self, value):
"""Set the contents of the data property from the model
CCPNInternal only
"""
if not isinstance(value, dict):
raise ValueError('value must be a dict')
self._wrappedData.data = value
def _checkProjectSubDirectories(self):
"""if need be, create all project subdirectories
"""
for dir in CCPN_SUB_DIRECTORIES:
self.projectPath.fetchDir(dir)
def _initialiseProject(self):
"""Complete initialisation of project,
set up logger and notifiers, and wrap underlying data
This routine is called from Framework, as some other machinery first needs to set up
(linkages, Current, notifiers and such)
"""
# The logger has already been set up when creating/loading the API project
# so just get it
self._logger = Logging.getLogger()
# Set up notifiers
self._registerPresetApiNotifiers()
# initialise, creating the children; pass in self as we are initialising
with inactivity(project=self):
self._restoreChildren()
# perform any required restoration of project not covered by children
self._restoreObject(self, self._wrappedData)
# we always have the default chemicalShift list
if len(self.chemicalShiftLists) == 0:
self.newChemicalShiftList(name='default')
# Call any updates
self._update()
@classmethod
def _restoreObject(cls, project, apiObj):
"""Process data that must always be performed after updating all children
"""
from ccpn.core._implementation.CollectionList import CollectionList
# create new collection table
project._collectionList = CollectionList(project=project)
# create new collections from table
project._collectionList._restoreObject(project, None)
# don't need to call super here
return project
def _close(self):
self.close()
[docs] def close(self):
"""Clean up the wrapper project previous to deleting or replacing
Cleanup includes wrapped data graphics objects (e.g. Window, Strip, ...)
"""
getLogger().info("Closing %s" % self.path)
# close any spectra
for sp in self.spectra:
sp._close()
# Remove undo stack:
self._resetUndo(maxWaypoints=0)
apiIo.cleanupProject(self)
self._clearAllApiNotifiers()
self.deleteAllNotifiers()
for tag in ('_data2Obj', '_pid2Obj'):
getattr(self, tag).clear()
# delattr(self,tag)
# del self._wrappedData
self.__dict__.clear()
def __repr__(self):
"""String representation"""
if self.isDeleted:
return "<Project:%s, isDeleted=True>" % self.name
else:
return "<Project:%s>" % self.name
def __str__(self):
"""String representation"""
if self.isDeleted:
return "<PR:%s, isDeleted=True>" % self.name
else:
return "<PR:%s>" % self.name
# CCPN properties
@property
def _key(self) -> str:
"""Project id: Globally unique identifier (guid)"""
return self._wrappedData.guid.translate(Pid.remapSeparators)
# _uniqueId: Some classes require a unique identifier per class
# use _uniqueId property defined in AbstractWrapperObject; values are maintained for project instance
def _queryNextUniqueIdValue(self, className) -> int:
"""query the next uniqueId for class className; does not increment its value
CCPNINTERNAL: used in NmrAtom on _uniqueName
"""
# _nextUniqueIdValues = {} # a (className, nexIdValue) dictionary
if not hasattr(self._wrappedData, '_nextUniqueIdValues'):
setattr(self._wrappedData, '_nextUniqueIdValues', {})
if self._wrappedData._nextUniqueIdValues is None:
self._wrappedData._nextUniqueIdValues = {}
nextUniqueId = self._wrappedData._nextUniqueIdValues.setdefault(className, 0)
return nextUniqueId
def _getNextUniqueIdValue(self, className) -> int:
"""Get the next uniqueId for class className; increments its value
CCPNINTERNAL: used in AbstractWrapper on __init__
"""
nextUniqueId = self._queryNextUniqueIdValue(className)
self._wrappedData._nextUniqueIdValues[className] += 1
return nextUniqueId
def _setNextUniqueIdValue(self, className, value):
"""Set the next uniqueId for class className
CCPNINTERNAL: should only be used in _restoreObject or Nef
"""
self._queryNextUniqueIdValue(className)
self._wrappedData._nextUniqueIdValues[className] = int(value)
@property
def _parent(self) -> AbstractWrapperObject:
"""Parent (containing) object."""
return None
[docs] def save(self, newPath: str = None, changeBackup: bool = True,
createFallback: bool = False, overwriteExisting: bool = False,
checkValid: bool = False, changeDataLocations: bool = False) -> bool:
"""Save project with all data, optionally to new location or with new name.
Unlike lower-level functions, this function ensures that data in high level caches are saved.
Return True if save succeeded otherwise return False (or throw error)
"""
# self._flushCachedData()
# Update the spectrum internal settings
for spectrum in self.spectra:
spectrum._saveObject()
# path is empty for save under the same name
if newPath:
# check validity of the newPath
newPath = aPath(newPath)
newPath.assureSuffix(CCPN_EXTENSION)
if newPath.exists() and not overwriteExisting:
raise ValueError('Cannot overwrite existing file "%s"' % newPath)
path = str(newPath)
if len(path) > 1024:
raise ValueError('There is a limit (1024) to the length of the path (%s)' % path)
_saveAs = True
else:
path = str(self.path)
_saveAs = False
try:
apiStatus = self._getAPIObjectsStatus()
if apiStatus.invalidObjects:
# if deleteInvalidObjects:
# delete here ...
# run save and apiStatus again. Ensure nothing else has been compromised on the deleting process
# else:
errorMsg = '\n '.join(apiStatus.invalidObjectsErrors)
getLogger().critical('Found compromised items. Project might be left in an invalid state. %s' % errorMsg)
# raise ValueError(error)
except Exception as es:
getLogger().warning('Error checking project status: %s' % str(es))
# don't check valid inside this routine as it is not optimised and only results in a crash. Use apiStatus object.
savedOk = apiIo.saveProject(self._wrappedData.root, newPath=path,
changeBackup=changeBackup, createFallback=createFallback,
overwriteExisting=overwriteExisting, checkValid=False,
changeDataLocations=changeDataLocations)
if savedOk:
self._resetIds()
# check for application and Gui; might not yet be there (e.g. on save of converted V2
# project)
if self.application and self.application.hasGui:
self.application.mainWindow.sideBar.setProjectName(self)
# store the version history in state subfolder json file
if _saveAs:
self._checkProjectSubDirectories()
self._saveHistory = newProjectSaveHistory(path)
self._saveHistory.addSaveRecord().save()
return savedOk
@property
def name(self) -> str:
"""name of Project"""
return self._wrappedData.root.name
@property
def path(self) -> str:
"""return absolute path to directory containing Project
"""
return apiIo.getRepositoryPath(self._wrappedData.root, 'userData')
[docs] @logCommand('project.')
def deleteObjects(self, *objs: typing.Sequence[typing.Union[str, Pid.Pid, AbstractWrapperObject]]):
"""Delete one or more objects, given as either objects or Pids
"""
getByPid = self.getByPid
objs = [getByPid(x) if isinstance(x, str) else x for x in objs]
with undoBlockWithoutSideBar():
for obj in objs:
if obj and not obj.isDeleted:
obj.delete()
@property
def _apiNmrProject(self) -> ApiNmrProject:
"""API equivalent to object: NmrProject"""
return self._wrappedData
# Undo machinery
@property
def _undo(self):
"""undo stack for Project. Implementation attribute"""
try:
result = self._wrappedData.root._undo
except:
result = None
return result
def _resetUndo(self, maxWaypoints: int = Undo.MAXUNDOWAYPOINTS,
maxOperations: int = Undo.MAXUNDOOPERATIONS,
debug: bool = False, application=None):
"""Reset undo stack, using passed-in parameters.
NB setting either parameter to 0 removes the undo stack."""
Undo.resetUndo(self._wrappedData.root, maxWaypoints=maxWaypoints,
maxOperations=maxOperations, debug=debug, application=application)
[docs] def newUndoPoint(self):
"""Set a point in the undo stack, you can undo/redo to """
undo = self._wrappedData.root._undo
if undo is None:
self._logger.warning("Trying to add undoPoint but undo is not initialised")
else:
undo.newWaypoint() # DO NOT CHANGE THIS ONE newWaypoint
self._logger.debug("Added undoPoint")
[docs] def blockWaypoints(self):
"""Block the setting of undo waypoints,
so that command echoing (_startCommandBLock) does not set waypoints
NB The programmer must GUARANTEE (try: ... finally) that waypoints are unblocked again"""
undo = self._wrappedData.root._undo
if undo is None:
self._logger.warning("Trying to block waypoints but undo is not initialised")
else:
undo.increaseWaypointBlocking()
self._logger.debug("Waypoint setting blocked")
[docs] def unblockWaypoints(self):
"""Block the setting of undo waypoints,
so that command echoing (_startCommandBLock) does not set waypoints
NB The programmer must GUARANTEE (try: ... finally) that waypoints are unblocked again"""
undo = self._wrappedData.root._undo
if undo is None:
self._logger.warning("Trying to unblock waypoints but undo is not initialised")
else:
undo.decreaseWaypointBlocking()
self._logger.debug("Waypoint setting unblocked")
# Should be removed:
@property
def _residueName2chemCompId(self) -> dict:
"""dict of {residueName:(molType,ccpCode)}"""
return MoleculeQuery.fetchStdResNameMap(self._wrappedData.root)
@property
def _experimentTypeMap(self) -> OrderedDict:
"""{dimensionCount : {sortedNucleusCodeTuple :
OrderedDict(experimentTypeSynonym : experimentTypeName)}}
dictionary
NB The OrderedDicts are ordered ad-hoc, with the most common experiments (hopefully) first
"""
# NB This is a hack, in order to rename experiments that we care particularly about
# This should disappear under refactoring
from ccpnmodel.ccpncore.lib.spectrum.NmrExpPrototype import priorityNameRemapping
# NBNB TODO FIXME fetchIsotopeRefExperimentMap should be merged with
# getExpClassificationDict output - we should NOT have two parallel dictionaries
result = self._implExperimentTypeMap
if result is None:
result = OrderedDict()
refExperimentMap = NmrExpPrototype.fetchIsotopeRefExperimentMap(self._apiNmrProject.root)
for nucleusCodes, refExperiments in refExperimentMap.items():
ndim = len(nucleusCodes)
dd1 = result.get(ndim, {})
result[ndim] = dd1
dd2 = dd1.get(nucleusCodes, OrderedDict())
dd1[nucleusCodes] = dd2
for refExperiment in refExperiments:
name = refExperiment.name
key = refExperiment.synonym or name
key = priorityNameRemapping.get(key, key)
dd2[key] = name
self._implExperimentTypeMap = result
#
return result
def _getReferenceExperimentFromType(self, value) -> Optional[RefExperiment]:
"""Search for a reference experiment matching name
"""
if value is None:
return
# nmrExpPrototype = self._wrappedData.root.findFirstNmrExpPrototype(name=value) # Why not findFirst instead of looping all sortedNmrExpPrototypes
for nmrExpPrototype in self._wrappedData.root.sortedNmrExpPrototypes():
for refExperiment in nmrExpPrototype.sortedRefExperiments():
if refExperiment.name == value:
return refExperiment
@property
def shiftAveraging(self):
"""Return shiftAveraging
"""
return self._wrappedData.shiftAveraging
@shiftAveraging.setter
def shiftAveraging(self, value):
"""Set shiftAveraging
"""
if not isinstance(value, bool):
raise TypeError('shiftAveraging must be True/False')
self._wrappedData.shiftAveraging = value
#===========================================================================================
# Notifiers system
#
# Old, API-level functions:
#
#===========================================================================================
@classmethod
def _setupApiNotifier(cls, func, apiClassOrName, apiFuncName, parameterDict=None):
"""Setting up API notifiers for subsequent registration on each new project
RESTRICTED. Use in core classes ONLY"""
tt = cls._apiNotifierParameters(func, apiClassOrName, apiFuncName,
parameterDict=parameterDict)
cls._apiNotifiers.append(tt)
@classmethod
def _apiNotifierParameters(cls, func, apiClassOrName, apiFuncName, parameterDict=None):
"""Define func as method of project and return API parameters for notifier setup
APPLICATION ONLY"""
if parameterDict is None:
parameterDict = {}
apiClassName = (apiClassOrName if isinstance(apiClassOrName, str)
else apiClassOrName._metaclass.qualifiedName())
dot = '_dot_'
wrapperFuncName = '_%s%s%s' % (func.__module__.replace('.', dot), dot, func.__name__)
setattr(Project, wrapperFuncName, func)
return (wrapperFuncName, parameterDict, apiClassName, apiFuncName)
def _registerApiNotifier(self, func, apiClassOrName, apiFuncName, parameterDict=None):
"""Register notifier for immediate action on current project (only)
func must be a function taking two parameters: the ccpn.core.Project and an Api object
matching apiClassOrName.
'apiFuncName' is either the name of an API modifier function (a setter, adder, remover),
in which case the notifier is triggered by this function
Or it is one of the following tags:
('', '__init__', 'postInit', 'preDelete', 'delete', 'startDeleteBlock', 'endDeleteBlock').
'' registers the notifier to any modifier function call ( setter, adder, remover),
__init__ and postInit triggers the notifier at the end of object creation, before resp.
after execution of postConstructorCode, the four delete-related tags
trigger notifiers at four different points in the deletion process
(see memops.Implementation.DataObject.delete() code for details).
ADVANCED, but free to use. Must be unregistered when any object referenced is deleted.
Use return value as input parameter for _unregisterApiNotifier (if desired)"""
tt = self.__class__._apiNotifierParameters(func, apiClassOrName, apiFuncName,
parameterDict=parameterDict)
return self._activateApiNotifier(*tt)
def _unregisterApiNotifier(self, notifierTuple):
"""Remove acxtive notifier from project. ADVANVED but free to use.
Use return value of _registerApiNotifier to identify the relevant notiifier"""
self._activeNotifiers.remove(notifierTuple)
Notifiers.unregisterNotify(*notifierTuple)
def _registerPresetApiNotifiers(self):
"""Register preset API notifiers. APPLCATION ONLY"""
for tt in self._apiNotifiers:
self._activateApiNotifier(*tt)
def _activateApiNotifier(self, wrapperFuncName, parameterDict, apiClassName, apiFuncName):
"""Activate API notifier. APPLICATION ONLY"""
notify = functools.partial(getattr(self, wrapperFuncName), **parameterDict)
notifierTuple = (notify, apiClassName, apiFuncName)
self._activeNotifiers.append(notifierTuple)
Notifiers.registerNotify(*notifierTuple)
#
return notifierTuple
def _clearAllApiNotifiers(self):
"""CLear all notifiers, previous to closing or deleting Project
APPLICATION ONLY
"""
while self._activeNotifiers:
tt = self._activeNotifiers.pop()
Notifiers.unregisterNotify(*tt)
#===========================================================================================
# Notifiers system
#
# New notifier system (Free for use in application code):
#
#===========================================================================================
[docs] def registerNotifier(self, className: str, target: str, func: typing.Callable[..., None],
parameterDict: dict = {}, onceOnly: bool = False) -> typing.Callable[..., None]:
"""
Register notifiers to be triggered when data change
:param str className: className of wrapper class to monitor (AbstractWrapperObject for 'all')
:param str target: can have the following values
*'create'* is called after the creation (or undeletion) of the object and its wrapper.
Notifier functions are called with the created V3 core object as the only parameter.
*'delete'* is called before the object is deleted
Notifier functions are called with the deleted to be deleted V3 core object as the only
parameter.
*'rename'* is called after the id and pid of an object has changed
Notifier functions are called with the renamed V3 core object and the old pid as parameters.
*'change'* when any object attribute changes value.
Notifier functions are called with the changed V3 core object as the only parameter.
rename and crosslink notifiers (see below) may also trigger change notifiers.
Any other value is interpreted as the name of a V3 core class, and the notifier
is triggered when a cross link (NOT a parent-child link) between the className and
the target class is modified
:param Callable func: The function to call when the notifier is triggered.
for actions 'create', 'delete' and 'change' the function is called with the object
created (deleted, undeleted, changed) as the only parameter
For action 'rename' the function is called with an additional parameter: oldPid,
the value of the pid before renaming.
If target is a second className, the function is called with the project as the only
parameter.
:param dict parameterDict: Parameters passed to the notifier function before execution.
This allows you to use the same function with different parameters in different contexts
:param bool onceOnly: If True, only one of multiple copies is executed
when notifiers are resumed after a suspension.
:return The registered notifier (which can be passed to removeNotifier or duplicateNotifier)
"""
if target in self._notifierActions:
tt = (className, target)
else:
# This is right, it just looks strange. But if target is not an action it is
# another className, and if so the names must be sorted.
tt = tuple(sorted([className, target]))
od = self._context2Notifiers.setdefault(tt, OrderedDict())
if parameterDict:
notifier = functools.partial(func, **parameterDict)
else:
notifier = func
if od.get(notifier) is None:
od[notifier] = onceOnly
else:
raise TypeError("Coding error - notifier %s set twice for %s,%s "
% (notifier, className, target))
#
return notifier
[docs] def unRegisterNotifier(self, className: str, target: str, notifier: typing.Callable[..., None]):
"""Unregister the notifier from this className, and target"""
if target in self._notifierActions:
tt = (className, target)
else:
# This is right, it just looks strange. But if target is not an action it is
# another className, and if so the names must be sorted.
tt = tuple(sorted([className, target]))
try:
if hasattr(self, '_context2Notifiers'):
od = self._context2Notifiers.get((tt), {})
del od[notifier]
except KeyError:
self._logger.warning("Attempt to unregister unknown notifier %s for %s" % (notifier, (className, target)))
[docs] def removeNotifier(self, notifier: typing.Callable[..., None]):
"""Unregister the the notifier from all places where it appears."""
found = False
for od in self._context2Notifiers.values():
if notifier in od:
del od[notifier]
found = True
if not found:
self._logger.warning("Attempt to remove unknown notifier: %s" % notifier)
[docs] def blankNotification(self):
"""Disable notifiers temporarily
e.g. to disable 'object modified' notifiers during object creation
Caller is responsible to make sure necessary notifiers are called, and to unblank after use"""
self._notificationBlanking += 1
[docs] def unblankNotification(self):
"""Resume notifier execution after blanking"""
self._notificationBlanking -= 1
if self._notificationBlanking < 0:
raise TypeError("Code Error: _notificationBlanking below zero!")
[docs] def suspendNotification(self):
"""Suspend notifier execution and accumulate notifiers for later execution"""
# return
# TODO suspension temporarily disabled
self._notificationSuspension += 1
[docs] def resumeNotification(self):
"""Execute accumulated notifiers and resume immediate notifier execution"""
# return
# TODO suspension temporarily disabled
# This was broken at one point, and we never found time to fix it
# It is a time-saving measure, allowing you to e.g. execute a
# peak-created notifier only once when creating hundreds of peaks in one operation
if self._notificationSuspension > 1:
self._notificationSuspension -= 1
else:
# Should not be necessary, but in this way we never get below 0 no matter what errors happen
self._notificationSuspension = 0
scheduledNotifiers = set()
executeNotifications = []
pendingNotifications = self._pendingNotifications
while pendingNotifications:
notification = pendingNotifications.pop()
notifier = notification[0]
onceOnly = notification[1]
if onceOnly:
# check whether the match pair, (function, object) is in the found set
matchNotifier = (notifier, notification[2])
if matchNotifier not in scheduledNotifiers:
scheduledNotifiers.add(matchNotifier)
# append the function call (function, object, *params)
executeNotifications.append((notifier, notification[2:]))
# if notifier not in scheduledNotifiers:
# scheduledNotifiers.add(notifier)
# executeNotifications.append((notifier, notification[2:]))
else:
executeNotifications.append((notifier, notification[2:]))
#
for notifier, params in reversed(executeNotifications):
notifier(*params)
# Standard notified functions.
# RESTRICTED. Use in core classes ONLY
def _startDeleteCommandBlock(self, *allWrappedData):
"""Call startCommandBlock for wrapper object delete. Implementation only
If commented: _activateApiNotifier fails
Used by the preset Api notifiers populated for self._apiNotifiers;
have _newApiObject, _startDeleteCommandBlock, _finaliseApiDelete, _endDeleteCommandBlock, _finaliseApiUnDelete
and _modifiedApiObject for each V3 class
Initialised from _linkWrapperObjects in AbstractWrapperObject.py:954
"""
undo = self._undo
if undo is not None:
# set undo step
undo.newWaypoint() # DO NOT CHANGE THIS
undo.increaseWaypointBlocking()
# self.suspendNotification()
def _endDeleteCommandBlock(self, *dummyWrappedData):
"""End block for delete command echoing
MUST be paired with _startDeleteCommandBlock call - use try ... finally to ensure both are called
"""
undo = self._undo
if undo is not None:
# self.resumeNotification()
undo.decreaseWaypointBlocking()
def _newApiObject(self, wrappedData, cls: AbstractWrapperObject):
"""Create new wrapper object of class cls, associated with wrappedData.
and call creation notifiers"""
factoryFunction = cls._factoryFunction
if factoryFunction is None:
result = cls(self, wrappedData)
else:
# Necessary for classes where you need to instantiate a subclass instead
result = self._data2Obj.get(wrappedData)
# There are cases where _newApiObject is registered twice,
# when two wrapper classes share the same API class
# (Peak,Integral; PeakList, IntegralList)
# In those cases only the notifiers are done the second time
if result is None:
result = factoryFunction(self, wrappedData)
#
result._finaliseAction('create')
def _modifiedApiObject(self, wrappedData):
""" call object-has-changed notifiers
"""
if self._apiNotificationBlanking == 0:
obj = self._data2Obj[wrappedData]
obj._finaliseAction('change')
def _finaliseApiDelete(self, wrappedData):
"""Clean up after object deletion
"""
if not wrappedData.isDeleted:
raise ValueError("_finaliseApiDelete called before wrapped data are deleted: %s" % wrappedData)
# get object
obj = self._data2Obj.get(wrappedData)
if not obj:
# NOTE:ED - it shouldn't get here but occasionally it does :|
getLogger().warning(f'_finaliseApiDelete: no V3 object for {wrappedData}')
else:
# obj._finaliseAction('delete') # GWV: 20181127: now as notify('delete') decorator on delete method
# remove from wrapped2Obj
del self._data2Obj[wrappedData]
# remove from pid2Obj
del self._pid2Obj[obj.shortClassName][obj._id]
# Mark object as obviously deleted, and set up for undeletion
obj._id += '-Deleted'
wrappedData._oldWrapperObject = obj
obj._wrappedData = None
def _finaliseApiUnDelete(self, wrappedData):
"""restore undeleted wrapper object, and call creation notifiers,
same as _newObject"""
if wrappedData.isDeleted:
raise ValueError("_finaliseApiUnDelete called before wrapped data are deleted: %s" % wrappedData)
try:
oldWrapperObject = wrappedData._oldWrapperObject
except AttributeError:
raise ApiError("Wrapper object to undelete wrongly set up - lacks _oldWrapperObject attribute")
# put back in from wrapped2Obj
self._data2Obj[wrappedData] = oldWrapperObject
if oldWrapperObject._id.endswith('-Deleted'):
oldWrapperObject._id = oldWrapperObject._id[:-8]
# put back in pid2Obj
self._pid2Obj[oldWrapperObject.shortClassName][oldWrapperObject._id] = oldWrapperObject
# Restore object to pre-undeletion state
del wrappedData._oldWrapperObject
oldWrapperObject._wrappedData = wrappedData
# oldWrapperObject._finaliseAction('create') # EJB: 20211119: now as notify('delete') decorator on delete method
def _notifyRelatedApiObject(self, wrappedData, pathToObject: str, action: str):
""" call 'action' type notifiers for getattribute(pathToObject)(wrappedData)
pathToObject is a navigation path (may contain dots) and must yield an API object
or an iterable of API objects"""
if self._apiNotificationBlanking == 0:
getDataObj = self._data2Obj.get
target = operator.attrgetter(pathToObject)(wrappedData)
# GWV: a bit too much for now; should be the highest debug level only
#self._project._logger.debug('%s: %s.%s = %s'
# % (action, wrappedData, pathToObject, target))
if not target:
pass
elif hasattr(target, '_metaclass'):
if not target.isDeleted:
# Hack. This is an API object - only if exists
getDataObj(target)._finaliseAction(action)
else:
# This must be an iterable
for obj in target:
if not obj.isDeleted:
getDataObj(obj)._finaliseAction(action)
# def _finaliseApiRename(self, wrappedData):
# """Reset Finalise rename - called from API object (for API notifiers)
# """
# # Should be handled by decorators
# if self._apiNotificationBlanking == 0:
# getLogger().debug2(f'*** SHOULD THIS BE CALLED? {self._data2Obj.get(wrappedData)}')
# # obj = self._data2Obj.get(wrappedData)
# # obj._finaliseAction('rename')
def _finalisePid2Obj(self, obj, action):
"""New/Delete object to the general dict for v3 pids
"""
# update pid:object mapping dictionary
dd = self._pid2Obj.setdefault(obj.className, self._pid2Obj.setdefault(obj.shortClassName, {}))
# set/delete on action
if action == 'create':
dd[obj.id] = obj
elif action == 'delete':
# should never fail
del dd[obj.id]
def _modifiedLink(self, dummy, classNames: typing.Tuple[str, str]):
""" call link-has-changed notifiers
The notifier function called must have the signature
func(project, **parameterDict)
NB
1) calls to this function must be set up explicitly in the wrapper for each crosslink
2) This function is only called when the link is changed explicitly, not when
a linked object is created or deleted"""
if self._notificationBlanking:
return
# get object
className, target = tuple(sorted(classNames))
# self._doNotification(classNames[0], classNames[1], self)
# NB 'AbstractWrapperObject' not currently in use (Sep 2016), but kept for future needs
iterator = (self._context2Notifiers.setdefault((name, target), OrderedDict())
for name in (className, 'AbstractWrapperObject'))
# Notification suspension postpones notifications (and removes duplicates)
# It is broken and has been disabled for a long time.
# There may be some accumulated bugs when (if)it is turned back on.
if False and self._notificationSuspension:
ll = self._pendingNotifications
for dd in iterator:
for notifier, onceOnly in dd.items():
ll.append((notifier, onceOnly, self))
else:
for dd in iterator:
for notifier in dd:
notifier(self)
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Library functions
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _updateApiDataUrl(self, path):
"""Update the data url to path; for legacy purposes
"""
# Reset remoteData DataStores to match path
if path is None or len(path) == 0:
getLogger().debug('_updateApiDataUrl: invalid path %r' % path)
return
path = aPath(path)
if not path.exists():
getLogger().debug('_updateApiDataUrl: path %r does not exist' % path)
return
memopsRoot = self._wrappedData.root
dataUrl = memopsRoot.findFirstDataLocationStore(name='standard').findFirstDataUrl(
name='remoteData'
)
dataUrl.url = Implementation.Url(path=str(path.as_posix()))
def _getAPIObjectsStatus(self, completeScan=False, includeDefaultChildren=False):
"""
Scan all API objects and check their validity.
Parameters:
completeScan: bool, True to perform a complete validity check of all found API objects
includeDefaultChildren: bool, False to exclude default objects for inspection such as
ChemComps and associated, nmrExpPrototypes etc.See _APIStatus._excludedChildren
for the full list of exclusions.
Return: the API Status object. See _APIStatus for full description
"""
getLogger().info('Validating Project integrity...')
from ccpn.core._implementation.APIStatus import APIStatus
root = self._apiNmrProject.root
apiStatus = APIStatus(apiObj=root, completeScan=completeScan, includeDefaultChildren=includeDefaultChildren)
return apiStatus
def _update(self):
"""Call the _updateObject method on all objects, including self
"""
self._updateObject(UPDATE_POST_PROJECT_INITIALISATION)
objs = self._getAllDecendants()
for obj in objs:
obj._updateObject(UPDATE_POST_PROJECT_INITIALISATION)
[docs] @logCommand('project.')
def exportNef(self, path: str = None,
overwriteExisting: bool = False,
skipPrefixes: typing.Sequence[str] = (),
expandSelection: bool = True,
includeOrphans: bool = False,
pidList: typing.Sequence[str] = None):
"""
Export selected contents of the project to a Nef file.
skipPrefixes: ( 'ccpn', ..., <str> )
expandSelection: <bool>
includeOrphans: <bool>
Include 'ccpn' in the skipPrefixes list will exclude ccpn specific items from the file
expandSelection = True will include all data from the project, this may not be data that
is not defined in the Nef standard.
includeOrphans = True will include chemicalShifts that have no peak assignments (orphans)
PidList is a list of <str>, e.g. 'NC:@-', obtained from the objects to be included.
The Nef file may also contain further dependent items associated with the pidList.
:param path: output path and filename
:param skipPrefixes: items to skip
:param expandSelection: expand the selection
:param includeOrphans: include chemicalShift orphans
:param pidList: a list of pids
"""
from ccpn.framework.lib.ccpnNef import CcpnNefIo
with undoBlock():
with notificationBlanking():
CcpnNefIo.exportNef(self, path,
overwriteExisting=overwriteExisting,
skipPrefixes=skipPrefixes,
expandSelection=expandSelection,
includeOrphans=includeOrphans,
pidList=pidList)
[docs] @staticmethod
def isCoreObject(obj) -> bool:
"""Return True if obj is a core ccpn object
"""
return isinstance(obj, (AbstractWrapperObject, V3CoreObjectABC))
[docs] @staticmethod
def isCoreClass(klass) -> bool:
"""Return True if type(klass) is a core ccpn object type
"""
return isinstance(klass, type) and issubclass(klass, (AbstractWrapperObject, V3CoreObjectABC))
#===========================================================================================
# Data loaders
#===========================================================================================
[docs] def loadData(self, path: (str, Path)) -> list:
"""Just a stub for backward compatibility
"""
return self.application.loadData(path)
def _loadFastaFile(self, path: (str, Path)) -> list:
"""Load Fasta sequence(s) from file into Wrapper project
CCPNINTERNAL: called from FastDataLoader
"""
with logCommandManager('application.', 'loadData', path):
sequences = fastaIo.parseFastaFile(path)
chains = []
for sequence in sequences:
newChain = self.createChain(sequence=sequence[1], compoundName=sequence[0],
molType='protein')
chains.append(newChain)
#
return chains
def _loadPdbFile(self, path: (str, Path)) -> list:
"""Load data from pdb file path into new StructureEnsemble object(s)
CCPNINTERNAL: called from pdb dataLoader
"""
from ccpn.util.StructureData import EnsembleData
with logCommandManager('application.', 'loadData', path):
path = aPath(path)
name = path.basename
ensemble = EnsembleData.from_pdb(str(path))
se = self.newStructureEnsemble(name=name, data=ensemble)
return [se]
def _loadTextFile(self, path: (str, Path)) -> list:
"""Load text from file path into new Note object
CCPNINTERNAL: called from text dataLoader
"""
with logCommandManager('application.', 'loadData', path):
path = aPath(path)
name = path.basename
with path.open('r') as fp:
# cannot do read() as we want one string
text = ''.join(line for line in fp.readlines())
note = self.newNote(name=name, text=text)
return [note]
def _loadLayout(self, path: (str, Path), subType: str):
# this is a GUI only function call. Please move to the appropriate location on 3.1
self.application._restoreLayoutFromFile(path)
def _loadExcelFile(self, path: (str, Path)) -> list:
"""Load data from a Excel file.
:returns list of loaded objects (awaiting adjust ment of excelReader)
CCPNINTERNAL: used in Excel data loader
"""
with logCommandManager('application.', 'loadData', path):
with undoBlock():
reader = ExcelReader(project=self, excelPath=str(path))
result = reader.load()
return result
#===========================================================================================
# End data loaders
#===========================================================================================
[docs] def getObjectsByPartialId(self, className: str,
idStartsWith: str) -> typing.List[AbstractWrapperObject]:
"""get objects from class name / shortName and the start of the ID.
The function does NOT interrogate the API level, which makes it faster in a
number fo cases, e.g. for NmrResidues"""
dd = self._pid2Obj.get(className)
if dd:
# NB the _pid2Obj entry is set in the object init.
# The relevant dictionary may therefore be missing if no object has yet been created
result = [tt[1] for tt in dd.items() if tt[0].startswith(idStartsWith)]
else:
result = None
#
return result
[docs] def getObjectsById(self, className: str, id: str) -> typing.List[AbstractWrapperObject]:
"""get objects from class name / shortName and the start of the ID.
The function does NOT interrogate the API level, which makes it faster in a
number fo cases, e.g. for NmrResidues"""
dd = self._pid2Obj.get(className)
if dd:
# NB the _pid2Obj entry is set in the object init.
# The relevant dictionary may therefore be missing if no object has yet been created
result = [tt[1] for tt in dd.items() if tt[0] == id]
else:
result = None
#
return result
[docs] def getObjectsByPids(self, pids: list):
"""Optimise method to get all found objects from a list of pids. Remove any None.
Warning: do not use with zip"""
return list(filter(None, map(lambda x: self.getByPid(x) if isinstance(x, str) else str(x), pids)))
[docs] def getByPids(self, pids: list):
"""Optimise method to get all found objects from a list of pids. Remove any None.
"""
objs = [self.getByPid(pid) if isinstance(pid, str) else pid for pid in pids]
return list(filter(lambda obj: self.isCoreObject(obj), objs))
[docs] def getPidsByObjects(self, objs: list):
"""Optimise method to get all found pids from a list of objects. Remove any None.
Warning: do not use with zip"""
return list(filter(None, map(lambda x: x.pid if isinstance(x, AbstractWrapperObject) else None, objs)))
[docs] def getCcpCodeData(self, ccpCode, molType=None, atomType=None):
"""Get the CcpCode for molType/AtomType
"""
from ccpnmodel.ccpncore.lib.assignment.ChemicalShift import getCcpCodeData
return getCcpCodeData(self._apiNmrProject, ccpCode, molType='protein', atomType=atomType)
# def packageProject(self, filePrefix, includeBackups=True, includeLogs=True):
# """Package the project
# """
# from ccpnmodel.ccpncore.lib.Io import Api as apiIo
#
# return apiIo.packageProject(self._wrappedData.parent, filePrefix,
# includeBackups=includeBackups, includeLogs=includeLogs)
[docs] @logCommand('project.')
def saveToArchive(self) -> Path:
"""Make new time-stamped archive of project
:return path to .tgz archive file as a Path object
"""
from ccpn.core.lib.ProjectArchiver import ProjectArchiver
archiver = ProjectArchiver(projectPath=self.path)
archivePath = archiver.makeArchive()
getLogger().info('==> Project archived to %s' % archivePath)
return archivePath
def _getArchivePaths(self) -> List[Path]:
""":return list of archives from archive directory
CCPNINTERAL: used in GuiMainWindow
"""
from ccpn.core.lib.ProjectArchiver import ProjectArchiver
archiver = ProjectArchiver(projectPath=self.project.path)
return archiver.archives
[docs] def getExperimentClassifications(self) -> dict:
"""Get a dictionary of dictionaries of dimensionCount:sortedNuclei:ExperimentClassification named tuples.
"""
# NOTE:ED - better than being in spectrumLib but still needs moving
from ccpnmodel.ccpncore.lib.spectrum.NmrExpPrototype import getExpClassificationDict
return getExpClassificationDict(self._wrappedData)
#===========================================================================================
# new<Object> and other methods
# Call appropriate routines in their respective locations
#===========================================================================================
[docs] def newMark(self, colour: str, positions: Sequence[float], axisCodes: Sequence[str],
style: str = 'simple', units: Sequence[str] = (), labels: Sequence[str] = ()):
"""
To be depreciated in next version; use mainWindow.newMark() instead
"""
return self.application.mainWindow.newMark(colour=colour, positions=positions,
axisCodes=axisCodes, style=style,
units=units, labels=labels)
[docs] @logCommand('project.')
def newSpectrum(self, path: str, name: str = None):
"""Creation of new Spectrum defined by path; optionally set name.
"""
from ccpn.core.Spectrum import _newSpectrum
return _newSpectrum(self, path=path, name=name)
# @logCommand('project.')
# def createDummySpectrum(self, axisCodes: Sequence[str], name=None, chemicalShiftList=None):
# """
# Make dummy spectrum from isotopeCodes list - without data and with default parameters.
#
# :param axisCodes:
# :param name:
# :param chemicalShiftList:
# :return: a new Spectrum instance.
# """
# raise NotImplementedError('Use Project.newEmptySpectrum')
[docs] @logCommand('project.')
def newEmptySpectrum(self, isotopeCodes: Sequence[str], dimensionCount=None, name='emptySpectrum', path=None,
**parameters):
"""
Make new Empty spectrum from isotopeCodes list - without data and with default parameters.
default parameters are defined in: SpectrumDataSourceABC.isotopeDefaultDataDict
:param isotopeCodes: a tuple/list of isotope codes that define the dimensions; e.g. ('1H', '13C')
:dimensionCount: an optional dimensionCount parameter; default derived from len(isotopeCodes)
:param name: the name of the resulting spectrum
:param path: an optional path to be stored with the Spectrum instance
:param **parameters: optional spectrum (parameter, value) pairs
:return: a new Spectrum instance.
"""
from ccpn.core.Spectrum import _newEmptySpectrum
return _newEmptySpectrum(self, isotopeCodes=isotopeCodes, dimensionCount=dimensionCount,
name=name, path=path, **parameters)
[docs] @logCommand('project.')
def newHdf5Spectrum(self, isotopeCodes: Sequence[str], name='hdf5Spectrum', path=None, **parameters):
"""
Make new hdf5 spectrum from isotopeCodes list - without data and with default parameters.
:param isotopeCodes:
:param name: name of the spectrum
:param path: optional path (autogenerated from name when None; resulting file will be in
data/spectra folder of the project)
:param **parameters: optional spectrum (parameter, value) pairs
:return: a new Spectrum instance.
"""
from ccpn.core.Spectrum import _newHdf5Spectrum
return _newHdf5Spectrum(self, isotopeCodes=isotopeCodes, name=name, path=path, **parameters)
[docs] @logCommand('project.')
def newNmrChain(self, shortName: str = None, isConnected: bool = False, label: str = '?',
comment: str = None):
"""Create new NmrChain. Setting isConnected=True produces a connected NmrChain.
:param str shortName: shortName for new nmrChain (optional, defaults to '@ijk' or '#ijk', ijk positive integer
:param bool isConnected: (default to False) If true the NmrChain is a connected stretch. This can NOT be changed later
:param str label: Modifiable NmrChain identifier that does not change with reassignment. Defaults to '@ijk'/'#ijk'
:param str comment: comment for new nmrChain (optional)
:return: a new NmrChain instance.
"""
from ccpn.core.NmrChain import _newNmrChain
return _newNmrChain(self, shortName=shortName, isConnected=isConnected,
label=label, comment=comment)
[docs] @logCommand('project')
def fetchNmrChain(self, shortName: str = None):
"""Fetch chain with given shortName; If none exists call newNmrChain to make one first
If shortName is None returns a new NmrChain with name starting with '@'
:param shortName: string name of required nmrAtom
:return: an NmrChain instance.
"""
from ccpn.core.NmrChain import _fetchNmrChain
return _fetchNmrChain(self, shortName=shortName)
[docs] @logCommand('project.')
def produceNmrAtom(self, atomId: str = None, chainCode: str = None,
sequenceCode: Union[int, str] = None,
residueType: str = None, name: str = None):
"""Get chainCode, sequenceCode, residueType and atomName from dot-separated atomId or Pid
or explicit parameters, and find or create an NmrAtom that matches
Empty chainCode gets NmrChain:@- ; empty sequenceCode get a new NmrResidue
:param atomId:
:param chainCode:
:param sequenceCode:
:param residueType:
:param name: new or existing nmrAtom, matching parameters
:return:
"""
from ccpn.core.NmrAtom import _produceNmrAtom
return _produceNmrAtom(self, atomId=atomId, chainCode=chainCode,
sequenceCode=sequenceCode, residueType=residueType, name=name)
[docs] @logCommand('project.')
def newNote(self, name: str = None, text: str = None, comment: str = None, **kwds):
"""Create new Note.
See the Note class for details.
Optional keyword arguments can be passed in; see Note._newNote for details.
:param name: name for the note.
:param text: contents of the note.
:return: a new Note instance.
"""
from ccpn.core.Note import _newNote
return _newNote(self, name=name, text=text, comment=comment, **kwds)
[docs] @logCommand('project.')
def newWindow(self, title: str = None, position: tuple = (), size: tuple = (), **kwds):
"""Create new child Window.
See the Window class for details.
Optional keyword arguments can be passed in; see Window._newWindow for details.
:param str title: window title (optional, defaults to 'W1', 'W2', 'W3', ...
:param tuple position: x,y position for new window in integer pixels.
:param tuple size: x,y size for new window in integer pixels.
:return: a new Window instance.
"""
from ccpn.ui._implementation.Window import _newWindow
return _newWindow(self, title=title, position=position, size=size, **kwds)
[docs] @logCommand('project.')
def newStructureEnsemble(self, name: str = None, data=None, comment: str = None, **kwds):
"""Create new StructureEnsemble.
See the StructureEnsemble class for details.
Optional keyword arguments can be passed in; see StructureEnsemble._newStructureEnsemble for details.
:param name: new name for the StructureEnsemble.
:param data: Pandas dataframe.
:param comment: optional comment string
:return: a new StructureEnsemble instance.
"""
from ccpn.core.StructureEnsemble import _newStructureEnsemble
return _newStructureEnsemble(self, name=name, data=data, comment=comment, **kwds)
[docs] @logCommand('project.')
def newDataTable(self, name: str = None, data=None, comment: str = None, **kwds):
"""Create new DataTable.
See the DataTable class for details.
Optional keyword arguments can be passed in; see DataTable._newDataTable for details.
:param name: new name for the DataTable.
:param data: Pandas dataframe.
:param comment: optional comment string
:return: a new DataTable instance.
"""
from ccpn.core.DataTable import _newDataTable
return _newDataTable(self, name=name, data=data, comment=comment, **kwds)
[docs] @logCommand('project.')
def fetchDataTable(self, name: str):
"""Get or create new DataTable.
:param name: name for the DataTable.
"""
from ccpn.core.DataTable import _fetchDataTable
return _fetchDataTable(self, name=name)
[docs] @logCommand('project.')
def newPeakCluster(self, peaks: Sequence[Union['Peak', str]] = None, **kwds) -> Optional['PeakCluster']:
"""Create new PeakCluster.
See the PeakCluster class for details.
Optional keyword arguments can be passed in; see PeakCluster._newPeakCluster for details.
:param peaks: optional list of peaks as objects or pids.
:return: a new PeakCluster instance.
"""
from ccpn.core.PeakCluster import _newPeakCluster
return _newPeakCluster(self, peaks=peaks, **kwds)
[docs] @logCommand('project.')
def newCollection(self, items: Sequence[typing.Any] = None, **kwds) -> Optional['Collection']:
"""Create new Collection.
See the Collection class for details.
Optional keyword arguments can be passed in; see Collection._newCollection for details.
:param items: optional list of core objects as objects or pids.
:return: a new Collection instance.
"""
return self._collectionList.newCollection(items=items, **kwds)
[docs] @logCommand('project.')
def newSample(self, name: str = None, pH: float = None, ionicStrength: float = None,
amount: float = None, amountUnit: str = None, isVirtual: bool = False, isHazardous: bool = None,
creationDate: datetime = None, batchIdentifier: str = None, plateIdentifier: str = None,
rowNumber: int = None, columnNumber: int = None, comment: str = None, **kwds):
"""Create new Sample.
See the Sample class for details.
Optional keyword arguments can be passed in; see Sample._newSample for details.
:param name:
:param pH:
:param ionicStrength:
:param amount:
:param amountUnit:
:param isVirtual:
:param isHazardous:
:param creationDate:
:param batchIdentifier:
:param plateIdentifier:
:param rowNumber:
:param columnNumber:
:param comment:
:param serial: optional serial number.
:return: a new Sample instance.
"""
from ccpn.core.Sample import _newSample
return _newSample(self, name=name, pH=pH, ionicStrength=ionicStrength,
amount=amount, amountUnit=amountUnit, isVirtual=isVirtual, isHazardous=isHazardous,
creationDate=creationDate, batchIdentifier=batchIdentifier, plateIdentifier=plateIdentifier,
rowNumber=rowNumber, columnNumber=columnNumber, comment=comment, **kwds)
[docs] @logCommand('project.')
def fetchSample(self, name: str):
"""Get or create Sample with given name.
See the Sample class for details.
:param self: project
:param name: sample name
:return: new or existing Sample instance.
"""
from ccpn.core.Sample import _fetchSample
return _fetchSample(self, name)
[docs] @logCommand('project.')
def newStructureData(self, name: str = None, title: str = None, programName: str = None, programVersion: str = None,
dataPath: str = None, creationDate: datetime = None, uuid: str = None,
comment: str = None, moleculeFilePath: str = None, **kwds):
"""Create new StructureData
See the StructureData class for details.
Optional keyword arguments can be passed in; see StructureData._newStructureData for details.
:param title: deprecated - original name for StructureData, please use .name
:param name:
:param programName:
:param programVersion:
:param dataPath:
:param creationDate:
:param uuid:
:param comment:
:return: a new StructureData instance.
"""
from ccpn.core.StructureData import _newStructureData
return _newStructureData(self, name=name, title=title, programName=programName, programVersion=programVersion,
dataPath=dataPath, creationDate=creationDate, uuid=uuid, comment=comment,
moleculeFilePath=moleculeFilePath, **kwds)
[docs] @logCommand('project.')
def newSpectrumGroup(self, name: str, spectra=(), **kwds):
"""Create new SpectrumGroup
See the SpectrumGroup class for details.
Optional keyword arguments can be passed in; see SpectrumGroup._newSpectrumGroup for details.
:param name: name for the new SpectrumGroup
:param spectra: optional list of spectra as objects or pids
:return: a new SpectrumGroup instance.
"""
from ccpn.core.SpectrumGroup import _newSpectrumGroup
return _newSpectrumGroup(self, name=name, spectra=spectra, **kwds)
[docs] @logCommand('project.')
def createChain(self, sequence: Union[str, Sequence[str]], compoundName: str = None,
startNumber: int = 1, molType: str = None, isCyclic: bool = False,
shortName: str = None, role: str = None, comment: str = None,
expandFromAtomSets: bool = True,
addPseudoAtoms: bool = True, addNonstereoAtoms: bool = True,
**kwds):
"""Create new chain from sequence of residue codes, using default variants.
Automatically creates the corresponding polymer Substance if the compoundName is not already taken
See the Chain class for details.
Optional keyword arguments can be passed in; see Chain._createChain for details.
:param Sequence sequence: string of one-letter codes or sequence of residue types
:param str compoundName: name of new Substance (e.g. 'Lysozyme') Defaults to 'Molecule_n
:param str molType: molType ('protein','DNA', 'RNA'). Needed only if sequence is a string.
:param int startNumber: number of first residue in sequence
:param str shortName: shortName for new chain (optional)
:param str role: role for new chain (optional)
:param str comment: comment for new chain (optional)
:param bool expandFromAtomSets: Create new Atoms corresponding to the ChemComp AtomSets definitions.
Eg. H1, H2, H3 equivalent atoms will add a new H% atom. This will facilitate assignments workflows.
See ccpn.core.lib.MoleculeLib.expandChainAtoms for details.
:return: a new Chain instance.
"""
from ccpn.core.Chain import _createChain
return _createChain(self, sequence=sequence, compoundName=compoundName,
startNumber=startNumber, molType=molType, isCyclic=isCyclic,
shortName=shortName, role=role, comment=comment,
expandFromAtomSets=expandFromAtomSets, addPseudoAtoms=addPseudoAtoms,
addNonstereoAtoms=addNonstereoAtoms, **kwds)
[docs] @logCommand('project.')
def newSubstance(self, name: str = None, labelling: str = None, substanceType: str = 'Molecule',
userCode: str = None, smiles: str = None, inChi: str = None, casNumber: str = None,
empiricalFormula: str = None, molecularMass: float = None, comment: str = None,
synonyms: typing.Sequence[str] = (), atomCount: int = 0, bondCount: int = 0,
ringCount: int = 0, hBondDonorCount: int = 0, hBondAcceptorCount: int = 0,
polarSurfaceArea: float = None, logPartitionCoefficient: float = None, **kwds):
"""Create new substance WITHOUT storing the sequence internally
(and hence not suitable for making chains). SubstanceType defaults to 'Molecule'.
ADVANCED alternatives are 'Cell' and 'Material'
See the Substance class for details.
Optional keyword arguments can be passed in; see Substance._newSubstance for details.
:param name:
:param labelling:
:param substanceType:
:param userCode:
:param smiles:
:param inChi:
:param casNumber:
:param empiricalFormula:
:param molecularMass:
:param comment:
:param synonyms:
:param atomCount:
:param bondCount:
:param ringCount:
:param hBondDonorCount:
:param hBondAcceptorCount:
:param polarSurfaceArea:
:param logPartitionCoefficient:
:return: a new Substance instance.
"""
from ccpn.core.Substance import _newSubstance
return _newSubstance(self, name=name, labelling=labelling, substanceType=substanceType,
userCode=userCode, smiles=smiles, inChi=inChi, casNumber=casNumber,
empiricalFormula=empiricalFormula, molecularMass=molecularMass, comment=comment,
synonyms=synonyms, atomCount=atomCount, bondCount=bondCount,
ringCount=ringCount, hBondDonorCount=hBondDonorCount, hBondAcceptorCount=hBondAcceptorCount,
polarSurfaceArea=polarSurfaceArea, logPartitionCoefficient=logPartitionCoefficient, **kwds)
[docs] @logCommand('project.')
def fetchNefSubstance(self, sequence: typing.Sequence[dict], name: str = None, **kwds):
"""Fetch Substance that matches sequence of NEF rows and/or name
See the Substance class for details.
Optional keyword arguments can be passed in; see Substance._fetchNefSubstance for details.
:param self:
:param sequence:
:param name:
:return: a new Nef Substance instance.
"""
from ccpn.core.Substance import _fetchNefSubstance
return _fetchNefSubstance(self, sequence=sequence, name=name, **kwds)
[docs] @logCommand('project.')
def getNefSubstance(self, sequence: typing.Sequence[dict], name: str = None, **kwds):
"""Get existing Substance that matches sequence of NEF rows and/or name
See the Substance class for details.
Optional keyword arguments can be passed in; see Substance._fetchNefSubstance for details.
:param self:
:param sequence:
:param name:
:return: a new Nef Substance instance.
"""
from ccpn.core.Substance import _getNefSubstance
return _getNefSubstance(self, sequence=sequence, name=name, **kwds)
[docs] @logCommand('project.')
def createPolymerSubstance(self, sequence: typing.Sequence[str], name: str,
labelling: str = None, userCode: str = None, smiles: str = None,
synonyms: typing.Sequence[str] = (), comment: str = None,
startNumber: int = 1, molType: str = None, isCyclic: bool = False,
**kwds):
"""Make new Substance from sequence of residue codes, using default linking and variants
NB: For more complex substances, you must use advanced, API-level commands.
See the Substance class for details.
Optional keyword arguments can be passed in; see Substance._fetchNefSubstance for details.
:param Sequence sequence: string of one-letter codes or sequence of residueNames
:param str name: name of new substance
:param str labelling: labelling for new substance. Optional - None means 'natural abundance'
:param str userCode: user code for new substance (optional)
:param str smiles: smiles string for new substance (optional)
:param Sequence[str] synonyms: synonyms for Substance name
:param str comment: comment for new substance (optional)
:param int startNumber: number of first residue in sequence
:param str molType: molType ('protein','DNA', 'RNA'). Required only if sequence is a string.
:param bool isCyclic: Should substance created be cyclic?
:return: a new Substance instance.
"""
from ccpn.core.Substance import _createPolymerSubstance
return _createPolymerSubstance(self, sequence=sequence, name=name,
labelling=labelling, userCode=userCode, smiles=smiles,
synonyms=synonyms, comment=comment,
startNumber=startNumber, molType=molType, isCyclic=isCyclic,
**kwds)
[docs] @logCommand('project.')
def fetchSubstance(self, name: str, labelling: str = None):
"""Get or create Substance with given name and labelling.
See the Substance class for details.
:param self:
:param name:
:param labelling:
:return: new or existing Substance instance.
"""
from ccpn.core.Substance import _fetchSubstance
return _fetchSubstance(self, name=name, labelling=labelling)
[docs] @logCommand('project.')
def newComplex(self, name: str, chains=(), **kwds):
"""Create new Complex.
See the Complex class for details.
Optional keyword arguments can be passed in; see Complex._newComplex for details.
:param name:
:param chains:
:return: a new Complex instance.
"""
from ccpn.core.Complex import _newComplex
return _newComplex(self, name=name, chains=chains, **kwds)
[docs] @logCommand('project.')
def newChemicalShiftList(self, name: str = None, spectra=(), **kwds):
"""Create new ChemicalShiftList.
See the ChemicalShiftList class for details.
:param name:
:param spectra:
:return: a new ChemicalShiftList instance.
"""
from ccpn.core.ChemicalShiftList import _newChemicalShiftList
return _newChemicalShiftList(self, name=name, spectra=spectra, **kwds)
@logCommand('project.')
def getChemicalShiftList(self, name: str = None, **kwds):
"""Get existing ChemicalShiftList.
See the ChemicalShiftList class for details.
:param name:
:return: a new ChemicalShiftList instance.
"""
from ccpn.core.ChemicalShiftList import _getChemicalShiftList
return _getChemicalShiftList(self, name=name, **kwds)
[docs] def getCollection(self, name: str) -> Optional['Collection']:
"""Return the collection from the supplied name
"""
from ccpn.core.Collection import _getCollection
return _getCollection(self, name=name)
[docs] @logCommand('project.')
def fetchCollection(self, name: str = None) -> 'Collection':
"""Get or create Collection.
See the Collection class for details.
:param name:
:return: a new Collection instance.
"""
from ccpn.core.Collection import _fetchCollection
return _fetchCollection(self, name=name)
#=========================================================================================
# Code adapted from prior _implementation/Io.py
#=========================================================================================
def _loadProject(application, path: str) -> Project:
"""Load the project defined by path
:return Project instance
"""
from ccpn.core._implementation.updates.update_v2 import updateProject_fromV2
_path = aPath(path)
if not _path.exists():
raise ValueError(f'Path {_path} does not exist')
if (apiProject := apiIo.loadProject(str(path), useFileLogger=True)) is None:
raise RuntimeError("No valid project loaded from %s" % path)
apiNmrProject = apiProject.fetchNmrProject()
apiNmrProject.initialiseData()
apiNmrProject.initialiseGraphicsData()
project = Project(apiNmrProject)
project._isNew = False
# NB: linkages are set in Framework._intialiseProject()
# If path pointed to a V2 project, save the result
if project._isUpgradedFromV2:
try:
# call the update
getLogger().info('==> Upgrading %s to version-3' % project)
updateProject_fromV2(project)
# Using api calls as V3-Project has not yet been fully instantiated
apiProject.touch()
apiProject.save()
getLogger().info('==> Writing model data')
except Exception as es:
getLogger().warning('Failed upgrading %s (%s)' % (project, str(es)))
else:
# check if it has been moved
projectPath = project.path
oldName = project.name
newName = aPath(projectPath).basename
if oldName != newName:
# Directory name has changed. Change project name and move Project xml file.
oldProjectFilePath = aPath(ApiPath.getProjectFile(projectPath, oldName))
if oldProjectFilePath.exists():
oldProjectFilePath.removeFile()
apiProject.__dict__['name'] = newName
# Using api calls as V3-Project has not yet been fully instantiated
apiProject.touch()
apiProject.save()
project._resetUndo(debug=application._debugLevel <= Logging.DEBUG2, application=application)
# Do some admin
# need project.path, as it may have have changed; e.g. for a V2 project
project._saveHistory = fetchProjectSaveHistory(project.path)
# the initialisation is completed by Framework when it has done its things
# project._initialiseProject()
return project
def _newProject(application, name: str = 'default', path: str = None, overwrite=False) -> Project:
"""Make new project, putting underlying data storage (API project) at path
:return Project instance
"""
# apiIo.newProject will create a temp path if path is None
if (apiProject := apiIo.newProject(name, path, overwriteExisting=overwrite, useFileLogger=True)) is None:
raise RuntimeError("New project could not be created (overlaps exiting project?) name:%s, path:%s, overwrite:"
% (name, path, overwrite))
apiNmrProject = apiProject.fetchNmrProject()
apiNmrProject.initialiseData()
apiNmrProject.initialiseGraphicsData()
project = Project(apiNmrProject)
project._isNew = True
# NB: linkages are set in Framework._initialiseProject()
project._objectVersion = application.applicationVersion
project._resetUndo(debug=application._debugLevel <= Logging.DEBUG2, application=application)
project._saveHistory = newProjectSaveHistory(project.path)
# the initialisation is completed by Framework when it has done its things
# project._initialiseProject()
return project