"""Basic API (data storage) I/O functions
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
"Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See https://ccpn.ac.uk/software/licensing/")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
"CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
"J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Geerten Vuister $"
__dateModified__ = "$dateModified: 2022-02-24 12:37:10 +0000 (Thu, February 24, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2017-04-07 10:28:48 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================
import glob
import os
import sys
import tempfile
import posixpath
from ccpn.util import Common as commonUtil
from ccpn.util import Logging
from ccpn.util import Path
from ccpn.framework.PathsAndUrls import CCPN_DIRECTORY_SUFFIX, CCPN_ARCHIVES_DIRECTORY, \
CCPN_STATE_DIRECTORY
from ccpnmodel.ccpncore.api.memops import Implementation
from ccpnmodel.ccpncore.lib import ApiPath
from ccpnmodel.ccpncore.memops.ApiError import ApiError
from ccpnmodel.ccpncore.memops.metamodel import Constants as metaConstants
# NBNB TBD this should be done by putting the function inside the class - later
from ccpnmodel.ccpncore.api.ccp.general.DataLocation import AbstractDataStore
# Necessary because shutil fails in permission copying for windows file systems,
# and the error is not properly caught on some VMs.
import ccpn.util.LocalShutil as shutil
# CCPN_DIRECTORY_SUFFIX = ApiPath.CCPN_DIRECTORY_SUFFIX
addCcpnDirectorySuffix = ApiPath.addCcpnDirectorySuffix
removeCcpnDirectorySuffix = ApiPath.removeCcpnDirectorySuffix
# CCPN_ARCHIVES_DIRECTORY = ApiPath.CCPN_ARCHIVES_DIRECTORY
[docs]class DefaultIoHandler:
"""Class to handle interactions with user and logging
Should be subclassed for actual functionality
This default class simply does nothing"""
def _createLogger(project, applicationName=None, useFileLogger=None):
if applicationName is None:
applicationName = project._applicationName if hasattr(project, '_applicationName') else 'ccpn'
if useFileLogger is None:
useFileLogger = project._useFileLogger
logger = (Logging.createLogger(applicationName, project, stream=sys.stderr) if useFileLogger
else Logging.getLogger())
project._applicationName = applicationName
project._useFileLogger = useFileLogger
project._logger = logger
return logger
# def ccpnProjectPath(path:str):
# if not path.endswith(CCPN_DIRECTORY_SUFFIX):
# path += CCPN_DIRECTORY_SUFFIX
# return path
#
# def ccpnProjectPathPrefix(path:str):
# if path.endswith(CCPN_DIRECTORY_SUFFIX):
# path = path[:-len(CCPN_DIRECTORY_SUFFIX)]
#
# return path
[docs]def newProject(projectName, path:str=None, overwriteExisting:bool=False, applicationName='ccpn',
useFileLogger:bool=True) -> Implementation.MemopsRoot:
"""
Create, and return, a new project using a specified path (directory).
If path is not specified it takes the current working directory.
The path can be either absolute or relative.
The 'userData' repository is pointed to the path.
The 'backup' repository is pointed to the path + '_backup' + CCPN_DIRECTORY_SUFFIX.
If either of these paths already exist (either as files or as directories):
If overwriteExisting:
Delete the path
Else if showYesNo:
Ask the user if it is ok to delete the path
If yes, delete. If no return None.
Else:
Raise an IOError
"""
# relies on knowing that repositories to move have these names, and these values for path suffix
repositoryNameMap = {'userData': '', 'backup': Path.CCPN_BACKUP_SUFFIX}
if path:
for name in repositoryNameMap.keys():
fullPath = Path.joinPath(path, projectName) + repositoryNameMap[name]
fullPath = addCcpnDirectorySuffix(fullPath)
if not absentOrRemoved(Path.joinPath(fullPath, 'memops', 'Implementation'),
overwriteExisting):
errMsg = 'Path ("%s") contains existing project.' % fullPath
Logging.getLogger().warning(errMsg)
raise Exception(errMsg)
temporaryDirectory = None
path = addCcpnDirectorySuffix(path)
else:
temporaryDirectory = tempfile.TemporaryDirectory(prefix='CcpnProject_',
suffix=CCPN_DIRECTORY_SUFFIX)
path = temporaryDirectory.name
project = Implementation.MemopsRoot(name=projectName)
if temporaryDirectory:
project._temporaryDirectory = temporaryDirectory
for name in repositoryNameMap.keys():
fullPath = Path.normalisePath(Path.joinPath(path, projectName)
+ repositoryNameMap[name], makeAbsolute=True)
repository = project.findFirstRepository(name=name)
repository.url = Implementation.Url(path=fullPath)
# Reset DataLocations
_initialiseStandardDataLocationStore(project)
# NBNB PREFERENCES
# Put here code to set remotedata location from preferences:
#
# remotePath = SOMETHING
# obj = project.findFirstDataLocationStore(name='standard').findFirstDataUrl(name='remoteData')
# obj.url = Implementation.Url(path=remotePath)
# create logger
logger = _createLogger(project, applicationName, useFileLogger)
logger.info("Located at %s" % path)
return project
[docs]def absentOrRemoved(path:str, overwriteExisting:bool=False) -> bool:
"""Check if file is present, possibly removing it first.
If path already exists:
If overwriteExisting:
Delete the path
Return True
Else if showYesNo:
Ask the user if it is ok to delete the path
If yes, delete and return True. If no return False.
Else:
return False
Else:
Return True
This function is not intended to be used outside this module but could be.
"""
if os.path.exists(path):
if overwriteExisting:
Path.deletePath(path)
return True
# elif showYesNo:
# if os.path.isdir(path):
# files = os.listdir(path)
# n = 5
# if len(files) > n:
# files = files[:n]
# files.append('...')
# ss = ', '.join(files)
# message = '%s already exists, remove it and all of its contents (%s) (if no, then no action taken)?' % (path, ss)
# if not showYesNo('Remove directory', message):
# return False
# else:
# Path.deletePath(path)
# return True
# else:
# message = '%s already exists (not as a directory), remove it?' % path
# if not showYesNo('Remove file', message):
# return False
# else:
# Path.deletePath(path)
# return True
else:
return False
else:
return True
[docs]def loadProject(path:str, projectName:str=None, askFile:"function"=None,
askDir:"function"=None, suppressGeneralDataDir:bool=False,
fixDataStores=True, applicationName='ccpn',
useFileLogger:bool=True) -> Implementation.MemopsRoot:
"""
Loads a project file and checks and deletes unwanted project repositories and
changes the project repository path if the project has moved. Returns the project.
(The project repository path is effectively the userData repository.)
askFile (if not None) has signature askFile(title, message, initial_value = '')
askDir (if not None) has signature askDir(title, message, initial_value = '')
Throws an IOError if there is an I/O error.
Throws an ApiError if there is an API exception.
"""
from ccpnmodel.ccpncore.memops.format.xml import XmlIO
def isGeneralDataWriteable(generalDataRep):
ppath = generalDataRep.url.path
return commonUtil.isWindowsOS() or os.access(ppath, os.W_OK|os.X_OK)
def isGeneralDataOk(proj):
if suppressGeneralDataDir:
return True
generalDataRep = proj.findFirstRepository(name='generalData')
if generalDataRep:
return isGeneralDataWriteable(generalDataRep)
return True
path = Path.normalisePath(path, makeAbsolute=True)
# Copies V2 projects to V3-compliant location, does some path clean-up and returns new path
path = copyV2ToV3Location(path)
debugMessages = []
infoMessages = []
# check if path exists and is directory
if not os.path.exists(path):
raise IOError('path "%s" does not exist' % path)
if not os.path.isdir(path):
raise IOError('path "%s" is not a directory' % path)
projectFile = ApiPath.getProjectFile(path, projectName)
if projectName:
# projectName was specified so projectFile better exist
if not os.path.exists(projectFile):
raise IOError('project file "%s" does not exist' % projectFile)
else:
# projectName was not specified so default projectFile might not exist
if not os.path.exists(projectFile):
projectFiles = ApiPath.getPossibleProjectFiles(path)
if len(projectFiles) == 0:
raise IOError('"%s" contains no project file' % path)
elif len(projectFiles) == 1:
projectFile = projectFiles[0]
elif askFile:
projectFile = askFile('Select project file', 'Select project file',
initial_value=projectFiles[0])
if not projectFile: # cancelled
raise IOError('Cancelled')
else:
raise IOError('"%s" contains %d project files, not sure which to use' % (path,
len(projectFiles)))
# TBD: should projectName be based on projectFile or on path???
# the way it is set here do not need to change project.name
# but if you used the path then you would need to change it
projectName = ApiPath.getTopObjIdFromFileName(projectFile)
# doing the loadProject twice has a bit of an overhead, but not much
try:
# below assumes TopObjects exist where stated, so might fail
project = XmlIO.loadProject(path, projectName, partialLoad=False)
except:
debugMessages.append("First loading attempt failed - compatibility problem?")
project = None
if project is not None and (not ApiPath.areAllTopObjectsPresent(project) or
not isGeneralDataOk(project)):
# if not all loaded (shell) TopObjects can be found, try again
project = None
debugMessages.append("Some files wer not found - project may have moved.")
if project is None:
debugMessages.append("Re-trying load, skipping cached TopObjects:")
project = XmlIO.loadProject(path, projectName, partialLoad=True)
# try and fix project repository path, if needed
packageLocator = project.packageLocator
repositories = packageLocator.repositories
for repository in repositories:
if repository.url.path == path:
oldPath = path
break
else:
# change first repository path to point to this one, and also change backup
repository = repositories[0]
oldPath = repository.url.path
infoMessages.append('Project file has moved; adjusting ...')
repository.url = Implementation.Url(path=path)
# Necessary because activeRepositories are not set right
# if file names do not match:
project.__dict__['activeRepositories'].append(repository)
projectRepository = repository
# check backup path
backupRepository = project.findFirstRepository(name="backup")
if backupRepository:
backupUrl = backupRepository.url
oldBackupPath = removeCcpnDirectorySuffix(backupUrl.path)
newBackupPath = removeCcpnDirectorySuffix(path) + Path.CCPN_BACKUP_SUFFIX
oldPathPrefix = removeCcpnDirectorySuffix(oldPath)
if oldBackupPath.startswith(oldPathPrefix): # hopefully true
if path != oldPath:
oldBackupPath = addCcpnDirectorySuffix(oldBackupPath)
newBackupPath = addCcpnDirectorySuffix(newBackupPath)
infoMessages.append('Backup changed to "%s"' % (newBackupPath))
backupRepository.url = Implementation.Url(path=newBackupPath)
else:
oldBackupPath = addCcpnDirectorySuffix(oldBackupPath)
if not os.path.exists(oldBackupPath):
newBackupPath = addCcpnDirectorySuffix(newBackupPath)
infoMessages.append('Backup changed to "%s"' % (newBackupPath))
backupRepository.url = Implementation.Url(path=newBackupPath)
# check if project repository is called 'userData'
if projectRepository.name != 'userData':
infoMessages.append('Project has non-standard repository name "%s"' %
projectRepository.name)
# repoint dataStores that are in same directory as project
# (but only if old path does not exist and new one does)
if path != oldPath:
oldDirectory = os.path.dirname(oldPath)
newDirectory = os.path.dirname(path)
for dataLocationStore in project.dataLocationStores:
for dataStore in dataLocationStore.dataStores:
fullPath = dataStore.fullPath
if not os.path.exists(fullPath):
dataUrl = dataStore.dataUrl
dataLocation = dataUrl.url.dataLocation
if (dataLocation == oldDirectory or
(dataLocation.startswith(oldDirectory) and dataLocation[len(oldDirectory)] ==
Path.dirsep)):
newDataUrlPath = newDirectory + dataLocation[len(oldDirectory):]
newPath = Path.joinPath(newDataUrlPath, dataStore.path)
if os.path.exists(newPath):
infoMessages.append('Data "%s": changed path to "%s"' %
(dataStore.dataLocationStore.name, newDataUrlPath))
dataUrl.url = dataUrl.url.clone(path=newDataUrlPath)
# change refData to current one if need be
refDataRepository = project.findFirstRepository(name='refData')
if refDataRepository:
oldPath = refDataRepository.url.path
newPath = Path.joinPath(Path.getPathToImport('ccpnmodel'), 'data')
if newPath != oldPath:
debugMessages.append('refData has been changed from "%s" to "%s"' % (oldPath, newPath))
refDataRepository.url = Implementation.Url(path=newPath)
else:
debugMessages.append('Project has no repository with name "refData"')
# change generalData to current one if need be
generalDataRepository = project.findFirstRepository(name='generalData')
if generalDataRepository and not suppressGeneralDataDir:
oldPath = generalDataRepository.url.path
if not os.path.exists(oldPath) or not isGeneralDataWriteable(generalDataRepository):
newPath = Path.normalisePath(os.path.expanduser('~/.ccpn/data'))
if not os.path.exists(newPath):
os.makedirs(newPath)
debugMessages.append('generalData has been changed from "%s" to "%s"' % (oldPath, newPath))
generalDataRepository.url = Implementation.Url(path=newPath)
# check other repository paths
for repository in project.repositories:
if repository not in (projectRepository, refDataRepository, backupRepository, generalDataRepository):
oldPath = repository.url.path
if not repository.stored:
# title = 'Repository being deleted'
# msg = 'Repository "%s" with path "%s" has no packageLocators, deleting' % (repository.name, oldPath)
repository.delete()
elif not os.path.exists(oldPath):
msg = 'Repository "%s" path "%s" does not exist' % (repository.name, oldPath)
debugMessages.append(msg)
debugMessages.append('List of packageLocators for repository "%s":' % repository.name)
for packageLocator in repository.stored:
debugMessages.append(' %s' % packageLocator.targetName)
if askDir:
title = 'Repository path does not exist'
newPath = askDir(title, msg + ': enter new path', initial_value=oldPath)
while newPath and not os.path.exists(newPath):
msg = 'Path "%s" does not exist' % newPath
newPath = askDir(title, msg + ': enter new path', initial_value=newPath)
if newPath:
repository.url = Implementation.Url(path=Path.normalisePath(newPath))
debugMessages.append("New path set: %s" % newPath)
else:
debugMessages.append(msg)
# check and fix dataLocationStores
if fixDataStores:
dataStores = []
for dataLocationStore in project.dataLocationStores:
for dataStore in dataLocationStore.dataStores:
# # We cannot prune these, as it causes them to be loaded out of turn.
# # NBNB keep this comment, to remind of the problem
# if hasattr(dataStore, 'nmrDataSources') and not dataStore.nmrDataSources:
# debugMessages.append('deleting empty dataStore %s with path %s'
# % (dataStore, dataStore.fullPath))
# dataStore.delete()
# # We do not use these, and if we ever did, who knows what else they might be used for
# # elif isinstance(dataStore, MimeTypeDataStore) and not dataStore.nmrDataSourceImages:
# # debugMessages.append('deleting empty dataStore %s with path %s'
# # % (dataStore, dataStore.fullPath))
# # dataStore.delete()
# else:
# dataStores.append(dataStore)
dataStores.append(dataStore)
badDataStores = [dataStore for dataStore in dataStores
if not os.path.exists(dataStore.fullPath)]
if badDataStores:
# find DataUrls involved
dataUrls = set(dataStore.dataUrl for dataStore in badDataStores)
# NBNB change here to possibly start a directory higher NBNB TBD
# NB the following gets you the project directory (the one containing memops/)
startDir = project.packageLocator.findFirstRepository().url.dataLocation
for dataUrl in dataUrls:
if not dataUrl.dataStores.difference(badDataStores):
# all DataStores for this DataUrl are bad
# we can make changes without affecting 'good' DataStores
# Look for an obvious place the data may have moved to
dataStores = dataUrl.sortedDataStores()
fullPaths = [dataStore.fullPath for dataStore in dataStores]
baseDir, newPaths = Path.suggestFileLocations(fullPaths,startDir=startDir)
if baseDir is not None:
# We have a file location that fits all missing files.
# Change dataStores to use it
debugMessages.append('resetting data locations to: %s' % baseDir)
AbstractDataStore.changeDataStoreUrl(dataStores[0], baseDir)
for ii,dataStore in enumerate(dataStores):
dataStore.path = newPaths[ii]
# Special hack for moving data of renamed packages on upgrade
for newName, oldName in project._movedPackageNames.items():
movePackageData(project, newName, oldName)
logger = _createLogger(project, applicationName, useFileLogger)
# initialise standard DataUrls and move dataStores to standard DataUrls where possible.
_initialiseStandardDataLocationStore(project)
_compressDataLocations(project)
if debugMessages:
# log warnings
for msg in debugMessages:
logger.debug(msg)
if infoMessages:
# info messages
for msg in infoMessages:
logger.info(msg)
# NBNB Hack: do data upgrade for V2-V3transition
# TBD FIXME remove for future versions
if project._upgradedFromV2:
from ccpnmodel.v_3_0_2.upgrade import correctFinalResult
correctFinalResult(project)
project.checkAllValid()
# Necessary to avoid having a V2 project inside V3 directory structure
project.saveModified()
#
return project
[docs]def cleanupProject(project):
"""Clean up project preparatory to closing (close log handlers etc.)"""
# clear log handlers before deleting
_clearLogHandlers(project)
# delete temporary project directory, if there is one
deleteTemporaryDirectory(project)
def _clearLogHandlers(project):
"""clear all log handlers attached to the project"""
if hasattr(project, '_logger'):
logger = project._logger
for handler in logger.handlers[:]:
logger.removeHandler(handler)
def _cleanupTemporaryLogging(project):
"""remove any logging pointing to the temporary directory"""
if hasattr(project, '_logger') and hasattr(project, '_temporaryDirectory'):
logger = project._logger
if logger.logPath.startswith(project._temporaryDirectory.name):
_clearLogHandlers(project)
[docs]def deleteTemporaryDirectory(project):
"""delete temporary project directory, if there is one"""
if hasattr(project, '_temporaryDirectory'):
_cleanupTemporaryLogging(project)
project._temporaryDirectory.cleanup()
del project._temporaryDirectory
[docs]def saveProject(project, newPath=None, changeBackup=True,
createFallback=False, overwriteExisting=False,
checkValid=False, changeDataLocations=False,
useFileLogger:bool=True) -> bool:
"""
Save the userData for a project to a location given by newPath (the url.path
of the userData repository) if set, or the existing location if not.
Return True if save succeeded otherwise return False (or throw error)
NB Changes to project in the function can NOT be undone, but previous contents of the undo
queue are left active, so you can undo backwards.
If userData does not exist then throws IOError.
If newPath is not specified then it is set to oldPath.
If changeBackup, then also changes backup URL path for project.
If createFallback, then makes copy of existing modified topObjects
files (in newPath, not oldPath) before doing save::
If newPath != oldPath and newPath exists (either as file or as directory):
If overwriteExisting:
Delete the newPath.
Else if showYesNo:
Ask the user if it is ok to delete the newPath
If yes, delete. If no, return without saving.
Else:
Raise an IOError
Elif newProjectName != oldProjectName and there exists corresponding path (file/directory):
If overwriteExisting:
Delete the path.
Else if showYesNo:
Ask the user if it is ok to delete the path.
If yes, delete. If no, return without saving.
Else:
Raise an IOError
If checkValid then does checkAllValid on project
If changeDataLocations then copy to project directory
If there is no exception or early return then at end userData is pointing to newPath.
Return True if save done, False if not (unless there is an exception)
"""
undo = project._undo
if undo is not None:
# TODO NBNB This should be put in a proper try-except block to avoid
# errors preventing reset
undo.increaseBlocking()
logger = project._logger
# check project valid (so don't save an obviously invalid project)
if checkValid:
project.checkAllValid()
# only want to change path for userData
userData = project.findFirstRepository(name='userData')
if not userData:
raise IOError('Problem: userData not found')
oldPath = userData.url.path
if newPath:
# normalise newPath
newPath = Path.normalisePath(newPath, makeAbsolute=True)
newPath = addCcpnDirectorySuffix(newPath)
else:
# NB this ensures that if we are going from V2 to V3 it will save in a new place
newPath = addCcpnDirectorySuffix(oldPath)
if newPath != oldPath:
project._logger.info("Project has been upgraded - saved in new location: %s "
% os.path.basename(newPath))
# set newProjectName
oldProjectName = project.name
if newPath == oldPath:
newProjectName = oldProjectName
else:
newProjectName = os.path.basename(newPath)
newProjectName = removeCcpnDirectorySuffix(newProjectName)
# below is because of data model limit
newProjectName = newProjectName[:32]
# if newProjectName != oldProjectName:
# _renameProject(project, newProjectName)
# if newPath same as oldPath, check if newProjectName already exists if it's not same as oldProjectName
# if newPath == oldPath:
# if newProjectName != oldProjectName:
# location = ApiPath.getTopObjectPath(project)
# if not absentOrRemoved(location, overwriteExisting, showYesNo):
# project.__dict__['name'] = oldProjectName # TBD: for now name is frozen so change this way
# # deleteTemporaryDirectory(project)
# if undo is not None:
# undo.decreaseBlocking()
# return False
# else: # check instead if newPath already exists
if newPath != oldPath:
if absentOrRemoved(newPath, overwriteExisting):
upDir = os.path.dirname(newPath)
if not os.path.exists(upDir):
os.makedirs(upDir)
if newProjectName != oldProjectName:
_renameProject(project, newProjectName)
logger.info('Renamed project %s to %s' % (oldProjectName, newProjectName))
else:
if undo is not None:
undo.decreaseBlocking()
logger = _createLogger(project, useFileLogger=useFileLogger)
logger.warning("Aborting Project.save - new target path already exists: %s" % newPath)
return False
# check if any topObject activeRepository is not either of above
refData = project.findFirstRepository(name='refData')
genData = project.findFirstRepository(name='generalData')
topObjects = []
repositories = set()
for topObject in project.topObjects:
repository = topObject.findFirstActiveRepository()
if repository and repository not in (userData, refData, genData):
topObjects.append(topObject)
repositories.add(repository)
if topObjects:
logger.warning('TopObjects %s, in repositories %s, being left in original locations'
% (topObjects, repositories))
oldUrl = userData.url
if changeBackup:
# change project backup url to point to new path
backupRepository = project.findFirstRepository(name="backup")
if backupRepository:
oldBackupUrl = backupRepository.url
else:
changeBackup = False
try:
# copy userData files over
if newPath != oldPath:
# if os.path.exists(oldPath): # only copy if this is a directory
if os.path.isdir(oldPath):
# just copy everything from oldPath to newPath
logger.info('Copying directory %s to %s' % (oldPath, newPath))
shutil.copytree(oldPath, newPath)
# but need to remove all implementation files
implPath = ApiPath.getImplementationDirectory(newPath)
#implPath = pathImplDirectory(newPath)
Path.deletePath(implPath)
# and need to repoint dataUrl's that were copied over
oldPathP = oldPath + '/'
for dataLocationStore in project.dataLocationStores:
for dataStore in dataLocationStore.dataStores:
oldDataPath = dataStore.fullPath
if oldDataPath.startswith(oldPathP):
dataUrl = dataStore.dataUrl
oldDataUrlPath = dataUrl.url.dataLocation
if oldDataUrlPath.startswith(oldPathP): # normally true
newDataUrlPath = newPath + oldDataUrlPath[len(oldPath):]
dataUrl.url = Implementation.Url(path=newDataUrlPath)
else: # path split awkwardly between absolute and relative
newDataUrlPath = newPath
dataUrl.url = Implementation.Url(path=newDataUrlPath)
dataStore.path = oldDataPath[len(oldPath):]
# change userData url to point to new path
userData.url = Implementation.Url(path=newPath)
# above will set project.isModified = True
if changeBackup:
# change project backup repository url to point to new path
path = removeCcpnDirectorySuffix(newPath)
backupRepository.url = Implementation.Url(path=path+Path.CCPN_BACKUP_SUFFIX+CCPN_DIRECTORY_SUFFIX)
# change project name
if newProjectName != oldProjectName:
if not project.isModified: # if it isModified it will be saved below
if createFallback:
createTopObjectFallback(project)
project.save()
# create fallbacks and keep track of modified topObjects
modifiedTopObjects = []
if createFallback:
for topObject in (project,)+tuple(project.topObjects):
if not topObject.isDeleted and topObject.isModified:
createTopObjectFallback(topObject)
modifiedTopObjects.append(topObject)
if changeDataLocations:
dataLocationStores = project.sortedDataLocationStores()
userRepository = project.findFirstRepository(name='userData')
userPath = userRepository.url.dataLocation
# 2010 Aug 11: remove data directory from path
#dataPath = joinPath(userPath, 'data')
dataPath = userPath
# 2010 Aug 11: change name
#dataStorePrefix = 'dataStore'
dataStorePrefix = 'spectra'
if os.path.exists(dataPath):
files = [xx for xx in os.listdir(dataPath) if xx.startswith(dataStorePrefix)]
offset = len(files)
else:
offset = 0
copyingList = []
dataUrlDict = {}
for dataLocationStore in dataLocationStores:
for dataStore in dataLocationStore.sortedDataStores():
# wb104: 24 Mar 2010: below check is a complete kludge
# we should check whether dataStore is instance of
# NumericMatrix, etc., but those are in ccp so should
# not be imported here
# in any case, there is no proper way to find out if
# a dataStore is used without explicit knowledge of class
knt = 0
# hicard = 1
for attr in ('nmrDataSourceImage',):
if hasattr(dataStore, attr):
if getattr(dataStore, attr):
knt += 1
# hicard > 1
for attr in ('externalDatas', 'nmrDataSources'):
if hasattr(dataStore, attr):
knt += len(getattr(dataStore, attr))
if knt == 0:
continue
oldFullPath = dataStore.fullPath
if not oldFullPath.startswith(userPath+'/'):
# first figure out new dataUrl path
dataUrl = dataStore.dataUrl
oldPath = dataUrl.url.dataLocation
if dataUrl in dataUrlDict:
newUrl = dataUrlDict[dataUrl]
else:
offset += 1
newUrlPath = '%s%d' % (dataStorePrefix, offset)
newUrlPath = Path.joinPath(dataPath, newUrlPath)
newUrl = dataUrlDict[dataUrl] = Implementation.Url(path=newUrlPath)
# then add to list to copy over if original data exists
if os.path.exists(oldFullPath):
newFullPath = Path.joinPath(newUrl.dataLocation, dataStore.path)
copyingList.append((oldFullPath, newFullPath))
# now copy data files over
nfilesToCopy = len(copyingList)
for n, (oldFullPath, newFullPath) in enumerate(copyingList):
dirName = os.path.dirname(newFullPath)
if not os.path.exists(dirName):
os.makedirs(dirName)
logger.warning('Copying file %s to %s (%d of %d)'
% (oldFullPath, newFullPath, n+1, nfilesToCopy))
shutil.copy(oldFullPath, newFullPath)
# finally change dataUrl paths
for dataUrl in dataUrlDict:
dataUrl.url = dataUrlDict[dataUrl]
# save modifications
# change way doing save in case exception is thrown
if createFallback:
for topObject in modifiedTopObjects:
try:
topObject.save()
except:
location = ApiPath.getTopObjectPath(topObject)
logger.warning('Exception working on topObject %s, file %s' % (topObject, location))
raise
# be safe and do below in case new modifications after
# modifiedTopObjects has been created
project.saveModified()
else:
project.saveModified()
if not commonUtil.isWindowsOS():
os.system('touch "%s"' % newPath) # so that user can see which are most recent
badTopObjects = []
for topObject in modifiedTopObjects:
if not checkFileIntegrity(topObject):
badTopObjects.append(topObject)
if badTopObjects:
logger.warning(
'Incomplete save - one or more files did not save completely, you should check them:')
for topObject in badTopObjects:
logger.warning("Bad save: %s - %s" % (topObject, ApiPath.getTopObjectPath(topObject)))
result = False
else:
result = True
except:
# saveModified failed so revert to old values
result = None
if newProjectName != oldProjectName:
project.__dict__['name'] = oldProjectName # TBD: for now name is frozen so change this way
print("WARNING - error saving. Save did not complete")
if newPath != oldPath:
userData.url = oldUrl
if changeBackup:
backupRepository.url = oldBackupUrl
Path.deletePath(newPath)
raise
finally:
if undo is not None:
undo.decreaseBlocking()
if newPath != oldPath:
# create a new logger if the path has changed
logger = _createLogger(project, useFileLogger=useFileLogger)
if result and (newProjectName != oldProjectName or
removeCcpnDirectorySuffix(newPath) != removeCcpnDirectorySuffix(oldPath)):
# save in newlocation succeeded - remove temporary directories
deleteTemporaryDirectory(project)
return result
[docs]def checkFileAtPath(path):
"""Check that file on disk ends correctly
"""
if not os.path.exists(path):
return False
size = os.path.getsize(path)
fp = open(path, 'rU')
ss = '<!--End of Memops Data-->'
n = len(ss)
fp.seek(size-n-2, 0) # -2 just to be safe in case has \r\n as line ending
data = fp.read().strip()[-n:]
fp.close()
return data == ss
[docs]def checkFileIntegrity(topObject):
"""Check that topObject file on disk ends correctly
"""
path = ApiPath.getTopObjectPath(topObject)
return checkFileAtPath(path)
[docs]def createTopObjectFallback(topObject):
"""
Create backup of topObject in same directory as original file but with '.bak' appended.
This function is not intended to be used outside this module but could be.
"""
logger = Logging.getLogger()
location = ApiPath.getTopObjectPath(topObject)
if not os.path.exists(location):
return
backupLocation = location + '.bak'
if not checkFileAtPath(location) and checkFileAtPath(backupLocation):
# current file no good and current backup good so do not do backup
logger.warning('File at location "%s" not complete so not backing up' % location)
return
# copy rather than move because will need that much disk space in any case
# and sometimes move fails at OS level if file with that name already exists
directory = os.path.dirname(backupLocation)
if not os.path.exists(directory):
os.makedirs(directory)
shutil.copy(location, backupLocation)
def _renameProject(project, newProjectName):
""" Rename project.
"""
oldProjectName = project.name
undo = project._undo
if undo is not None:
undo.increaseBlocking()
logger = Logging.getLogger()
# logger.warning('Renaming project %s to %s' % (project.name, newProjectName))
# change project name
if newProjectName == oldProjectName:
return
else:
project.override = True # TBD: for now name is frozen so change this way
try:
# below constraint is not checked in setName() if override is True so repeat here
isValid = newProjectName.isalnum() # superfluous but faster in most cases
if not isValid:
for cc in newProjectName:
if cc != '_' and not cc.isalnum():
isValid = False
break
else:
isValid = True
if not isValid:
raise ApiError('Illegal project name: %s\n Only alphanumeric or underscore allowed'
% newProjectName)
# below checks for length of name as well
project.name = newProjectName
project.touch()
finally:
project.override = False
if undo is not None:
undo.decreaseBlocking()
undo.newItem(_renameProject, _renameProject, undoArgs=(project,oldProjectName),
redoArgs=(project, newProjectName))
def _downlinkTagsByImport(root):
""" gives you the role names of links from MemopsRoot to TopObjects
in import order, so that imported packages come before importing packages
"""
from ccpnmodel.ccpncore.memops.metamodel import Util as metaUtil
leafPackages = []
packages = [root.metaclass.container.topPackage()]
for pp in packages:
childPackages = pp.containedPackages
if childPackages:
packages.extend(childPackages)
else:
leafPackages.append(pp)
# sort leafPackages by import (imported before importing)
leafPackages = metaUtil.topologicalSortSubgraph(leafPackages,
'accessedPackages')
tags = []
for pp in leafPackages:
cc = pp.topObjectClass
if cc is not None:
pr = cc.parentRole
if pr is not None:
tags.append(pr.otherRole.name)
#
return tags
[docs]def loadAllData(root):
""" Load all data for a given root (version >= 2.0)
"""
# load all new data before modifying IO map
for tag in _downlinkTagsByImport(root):
for topObj in getattr(root, tag):
if not topObj.isLoaded:
topObj.load()
[docs]def backupProject(project, dataLocationStores=None, skipRefData=True, clearOutDir=False):
def modificationTime(path):
return os.stat(path)[8]
backupRepository = project.findFirstRepository(name="backup")
if not backupRepository:
project._logger.warning('Warning: no backup path set, so no backup done')
return
backupUrl = backupRepository.url
backupPath = backupUrl.path
if not dataLocationStores:
dataLocationStores = set()
if clearOutDir:
Path.deletePath(backupPath)
topObjects = tuple(project.topObjects) + (project,)
for topObject in topObjects:
if skipRefData:
repository = topObject.findFirstActiveRepository(name='refData')
if repository:
continue
if topObject.isModified:
topObject.backup()
else:
repository = topObject.findFirstActiveRepository()
if repository:
origFile = ApiPath.findTopObjectPath(repository.url.path, topObject)
if os.path.exists(origFile):
# problem with appending repository.name is that topObject.backup()
# above does not do it this way, so end up with inconsistent backup
###backupDir = joinPath(backupPath, repository.name)
# so use same backup pah as topObject.backup()
backupDir = backupPath
backupFile = ApiPath.findTopObjectPath(backupDir, topObject)
if not os.path.exists(backupFile) or \
(modificationTime(backupFile) < modificationTime(origFile)):
directory = os.path.dirname(backupFile)
if not os.path.exists(directory):
os.makedirs(directory)
shutil.copy(origFile, backupFile)
else:
# one is stuffed
project._logger.warning('Warning: could not backup" "'
' %s since could not find original file "%s"'
% (topObject, origFile))
else:
# one is stuffed
project._logger.warning('Warning: could not backup %s since could not find repository'
% topObject)
dataBackupPath = Path.joinPath(backupPath, 'data')
for dataLocationStore in dataLocationStores:
dataBackupDir = Path.joinPath(dataBackupPath, dataLocationStore.name)
for dataStore in dataLocationStore.dataStores:
origFile = dataStore.dataUrl.url.path
backupFile = Path.joinPath(dataBackupDir, dataStore.path)
if os.path.exists(origFile):
if not os.path.exists(backupFile) or \
(modificationTime(backupFile) < modificationTime(origFile)):
directory = os.path.dirname(backupFile)
if not os.path.exists(directory):
os.makedirs(directory)
shutil.copy(origFile, backupFile)
else:
project._logger.warning('Warning: could not backup dataStore '
'"%s" because could not find original file "%s"'
% (dataStore.name, origFile))
[docs]def modifyPackageLocators(project,repositoryName,repositoryPath,packageNames,resetPackageLocator = True,resetRepository = False):
"""
Resets package locators for specified packages to specified repository.
Use as, for example:
modifyPackageLocators(project,'newChemComps','/mydir/data/chemComps/',('ccp.molecule.ChemComp',
'ccp.molecule.ChemCompCoord'))
Additional args:
- resetPackageLocator: True will reset the package locator completely, removing old info
False will add the repository to the package locator.
- resetRepository: True will reset url for the repository, even if it already exists
False will not reset the url for the repository if it already exists
Returns the relevant repository.
"""
repository = project.findFirstRepository(name = repositoryName)
ss = Path.normalisePath(repositoryPath)
if not repository:
repository = project.newRepository(name= repositoryName,
url=Implementation.Url(path=ss))
elif resetRepository and repository.url.path != repositoryPath:
repository.url = Implementation.Url(path=ss)
for packageName in packageNames:
packageLocator = project.findFirstPackageLocator(targetName = packageName)
if not packageLocator:
raise ApiError("Cannot modify repository 'any' for package %s"
% packageName)
if resetPackageLocator:
packageLocator.repositories = (repository,)
elif not repository in packageLocator.repositories:
packageLocator.addRepository(repository)
return repository
# GWV 20Jan2022: replaced by ProjectArchiver class
# def packageProject(project, filePrefix=None, includeBackups=False, includeData=False, includeLogs=False,
# includeArchives=False, includeSummaries=False):
# """
# Package up project userData into one gzipped tar file.
# If filePrefix is None then instead use the userData path.
# The tar file is filePrefix+".tgz".
# By default only \*.xml files are packaged up.
# If includeBackups then also \*.xml.bak files are included.
# If includeData then also dataStores located inside project directory are included.
# If includeLogs then logs are included.
# If includeArchives then archives are included.
# If includeSummaries then summaries are included.
# """
#
# # NBNB TBD FIXME check how many dataLocations to package (and make sure you reset first)
#
# import datetime
# import tarfile
#
# projectPath = getRepositoryPath(project, 'userData')
#
# if not filePrefix:
# now = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
# filePrefix = '%s-%s' % (os.path.basename(projectPath)[:-len(CCPN_DIRECTORY_SUFFIX)], now)
# filePrefix = os.path.join(projectPath, CCPN_ARCHIVES_DIRECTORY, filePrefix)
#
# if includeData:
# projectPathP = projectPath + '/'
# n = len(projectPath) + 1
# includedDataPaths = set()
# for dataLocationStore in project.dataLocationStores:
# for dataStore in dataLocationStore.dataStores:
# fullPath = dataStore.fullPath
# if fullPath.startswith(projectPathP):
# includedDataPaths.add(fullPath[n:])
#
# def visitDir(directory):
# tarFiles = []
# for relfile in os.listdir(directory):
# fullfile = os.path.join(directory, relfile)
# include = False
# if os.path.isdir(fullfile):
# if relfile != CCPN_ARCHIVES_DIRECTORY or includeArchives:
# tarFiles.extend(visitDir(fullfile))
# elif relfile.endswith('.xml'):
# include = True
# elif includeBackups and relfile.endswith('.xml.bak'):
# include = True
# elif includeLogs and relfile.endswith('.txt') and os.path.basename(directory) == ApiPath.CCPN_LOGS_DIRECTORY:
# include = True
# elif includeSummaries and os.path.basename(directory) == ApiPath.CCPN_SUMMARIES_DIRECTORY:
# include = True
# elif includeData and fullfile in includedDataPaths:
# include = True
#
# if include:
# project._logger.info(fullfile)
# tarFiles.append(fullfile)
#
# if tarFiles:
# tarFiles.insert(0, directory)
#
# return tarFiles
#
# tarFileName = '%s.tgz' % filePrefix
# tarDirectory = os.path.dirname(tarFileName)
# if not os.path.exists(tarDirectory):
# os.makedirs(tarDirectory)
# tarFp = tarfile.open(tarFileName, 'w:gz')
#
# cwd = os.getcwd()
# os.chdir(os.path.dirname(projectPath))
#
# try:
# project._logger.info('Files included in tar file:')
# files = visitDir(os.path.basename(projectPath))
# for tarFile in files:
# tarFp.add(tarFile, recursive=False)
# finally:
# os.chdir(cwd)
# tarFp.close()
#
# return tarFileName
[docs]def findCcpXmlFile(project,packageName,fileSearchString):
"""
Finds an XML file by a file search pattern from all available package repositories
"""
ff = project.findFirstPackageLocator
packageLocator = ff(targetName=packageName) or ff(targetName='any')
xmlFileName = None
# The repositories link is ordered!
for repository in packageLocator.repositories:
fileLocation = repository.getFileLocation(packageName)
xmlFileNameMatches = glob.glob(os.path.join(fileLocation,fileSearchString))
if xmlFileNameMatches:
if xmlFileNameMatches[-1][-4:] == '.xml':
xmlFileName = xmlFileNameMatches[-1]
break
return xmlFileName
[docs]def getRepositoryPath(project, repositoryName):
repository = project.findFirstRepository(name=repositoryName)
if repository:
path = repository.url.path
else:
path = None
return path
[docs]def setRepositoryPath(project, repositoryName, path):
from ccpnmodel.ccpncore.api.memops.Implementation import Url
repository = project.findFirstRepository(name=repositoryName)
if repository:
if path != repository.url.path:
# TBD: should we copy anything over from old url?
url = Url(path=Path.normalisePath(path))
repository.url = url
# TBD: should we throw an exception if repository is not found?
[docs]def movePackageData(root, newPackageName, oldPackageName):
"""Move all data from package oldPackageName to newPackageName"""
ff = root.findFirstPackageLocator
repositories = (ff(targetName=newPackageName) or ff(targetName='any')).repositories
newLocations = [x.getFileLocation(newPackageName) for x in repositories]
# If there were data in correct location assume that project is fixed already
if not any(x for x in newLocations if os.path.isdir(x)):
repositories = (ff(targetName=oldPackageName) or ff(targetName='any')).repositories
oldLocations = [x.getFileLocation(oldPackageName) for x in repositories]
oldLocations = [x for x in oldLocations if os.path.isdir(x)]
if oldLocations:
# there were old data in the wrong location. Move to new one.
newDir = newLocations[-1]
os.makedirs(newDir)
for oldDir in oldLocations:
for filename in os.listdir(oldDir):
if filename.startswith('.'):
# skip hidden files (on *nix/Mac)
continue
elif filename.endswith('.xml'):
shutil.move(os.path.join(oldDir, filename), os.path.join(newDir, filename))
def _initialiseStandardDataLocationStore(memopsRoot:Implementation.MemopsRoot):
"""Get or create standard DataLocationStore, and reset standard data location urls
to current location.
Called from MemopsRoot.__init__"""
result = memopsRoot.findFirstDataLocationStore(name='standard')
if result is None:
# Normal case make the DataLocationStore and contents
result = memopsRoot.newDataLocationStore(name='standard')
# insideData = data inside the project
dataUrlObject = result.findFirstDataUrl(name='insideData')
projectUrl = memopsRoot.findFirstRepository(name='userData').url
if dataUrlObject is None:
# make new dataUrl
result.newDataUrl(name='insideData', url=projectUrl)
else:
dataUrlObject.url = projectUrl
# alongsideData - points to directory containing project directory
dataUrlObject = result.findFirstDataUrl(name='alongsideData')
path, junk = Path.splitPath(projectUrl.path)
newUrl = Implementation.Url(path=path)
if dataUrlObject is None:
# make new dataUrl
result.newDataUrl(name='alongsideData', url=newUrl)
elif path != dataUrlObject.url.dataLocation:
# Update only if the alongside path has changed
pointToExisting = [x for x in dataUrlObject.dataStores if os.path.exists(x.fullPath)]
if pointToExisting:
# If there are files in the old alongsideData that still exist, (typically if the
# project is moved elsewhere within the same file system) move them to a different
# dataUrl object so as not to break the link.
# If project comes from a different computer this will not normally happen
newName = 'alongsideData'
while result.findFirstDataUrl(name=newName):
newName = commonUtil.incrementName(newName)
newDataUrlObject = result.newDataUrl(name=newName, url=dataUrlObject.url)
for dataStore in pointToExisting:
dataStore.dataUrl = newDataUrlObject
#
dataUrlObject.url = newUrl
# remoteData - initialised to home directory and not reset
dataUrlObject = result.findFirstDataUrl(name='remoteData')
path = os.path.expanduser('~')
# NOTE:ED - store in api as posixpath?
_path = str(path.replace(os.sep, '/'))
if dataUrlObject is None:
# make new dataUrl
result.newDataUrl(name='remoteData', url=Implementation.Url(path=_path))
#
return result
def _compressDataLocations(memopsRoot:Implementation.MemopsRoot):
"""Reorganise DataLocations to use standard DataUrls"""
standardStore = memopsRoot.findFirstDataLocationStore(name='standard')
locationData = []
standardTags = ('insideData', 'alongsideData', 'remoteData')
for tag in standardTags:
dataUrl = standardStore.findFirstDataUrl(name=tag)
locationData.append((os.path.join(dataUrl.url.path, ''), dataUrl))
standardUrls = [tt[1] for tt in locationData]
for dataLocationStore in memopsRoot.dataLocationStores:
for dataUrl in dataLocationStore.dataUrls:
if dataUrl not in standardUrls:
for dataStore in dataUrl.dataStores:
fullPath = dataStore.fullPath
for directory, targetUrl in locationData:
if fullPath.startswith(directory):
dataStore.repointToDataUrl(targetUrl)
dataStore.path = fullPath[len(directory):]
break
[docs]def copyV2ToV3Location(projectPath) -> str:
"""Copy V2 data to new directory with correct name and structure for V3
If project is already V3 does nothing (except converting 'xyz.ccpn/ccpn' to 'xyz.ccpn'"""
apiDirNames = ('memops', 'ccp', 'ccpnmr', 'cambridge', 'molsim', 'utrecht' )
logger = Logging.getLogger()
if projectPath.endswith(CCPN_DIRECTORY_SUFFIX):
version = 'V3'
else:
pt,dr = os.path.split(projectPath)
if dr == Path.CCPN_API_DIRECTORY and pt.endswith(CCPN_DIRECTORY_SUFFIX):
logger.debug("Interpreting path %s as project path %s" % (projectPath, pt))
projectPath = pt
version = 'V3'
else:
version = 'V2'
if not os.path.isdir(projectPath):
raise IOError("No directory named %s" % projectPath)
if version == 'V2':
newProjectPath = addCcpnDirectorySuffix(projectPath)
ii = 0
while os.path.exists(newProjectPath):
ii += 1
newProjectPath = addCcpnDirectorySuffix('%s_%s' % (projectPath, ii))
logger.info(
'Copying directory %s to %s' % (projectPath, newProjectPath))
newApiFileDir = os.path.join(newProjectPath, Path.CCPN_API_DIRECTORY)
if not os.path.exists(newApiFileDir):
os.makedirs(newApiFileDir)
for name in os.listdir(projectPath):
source = os.path.join(projectPath, name)
if name in apiDirNames:
target = os.path.join(newApiFileDir, name)
else:
target = os.path.join(newProjectPath, name)
#
if os.path.isdir(source):
shutil.copytree(source, target)
else:
shutil.copyfile(source, target)
#
return newProjectPath
else:
#
return projectPath