Source code for ccpn.util.Update

"""
Module Documentation here
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See https://ccpn.ac.uk/software/licensing/")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2022-03-17 10:35:33 +0000 (Thu, March 17, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2017-07-04 15:21:16 +0000 (Tue, July 04, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================

import difflib
import hashlib
import os
import shutil
import sys
from datetime import datetime
import json


ccpn2Url = 'http://www.ccpn.ac.uk'

from ccpn.util import Path


SERVER = ccpn2Url + '/'
SERVER_DB_ROOT = 'ccpNmrUpdate'
SERVER_DB_FILE = '__UpdateData.db'
# the reason to use a CGI script just to download a file is because of exception handling
# when you just fetch a URL you always get a response but how do you know it is valid
# (and not a 404 or whatever)
SERVER_DOWNLOAD_SCRIPT = 'cgi-bin/update/downloadFile'
SERVER_UPLOAD_SCRIPT = 'cgi-bin/updateadmin/uploadFileVerify'
SERVER_DOWNLOADCHECK_SCRIPT = 'cgi-bin/register/downloadFileCheckV3'

FIELD_SEP = '\t'
PATH_SEP = '__sep_'
WHITESPACE_AND_NULL = {'\x00', '\t', '\n', '\r', '\x0b', '\x0c'}

# require only . and numbers and at least one of these
# 23 Nov 2015: remove below RE because version can have letter in it, so just do exact match
###VERSION_RE = re.compile('^[.\d]+$')

BAD_DOWNLOAD = 'Exception: '
ERROR_DOWNLOAD = 'Error: '
DELETEHASHCODE = '<DELETE>'
TERMSANDCONDITIONS = 'termsConditions'

VERSION_UPDATE_FILE = 'src/python/ccpn/framework/Version.py'


[docs]def lastModifiedTime(filePath): if not os.path.isfile(filePath): return 0 if not os.access(filePath, os.R_OK): return 0 return os.stat(filePath).st_mtime
[docs]def isBinaryFile(fileName): """Check whether the fileName is a binary file (not always guaranteed) Doesn't check for a fullPath Returns False if the file does not exist """ if os.path.isfile(fileName): with open(fileName, 'rb') as fileObj: # read the first 1024 bytes of the file firstData = fileObj.read(1024) # remove all characters that are considered as text textchars = bytearray({7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x100)) - {0x7f}) isBinary = bool(firstData.translate(None, textchars)) return isBinary
[docs]def isBinaryData(data): """Check whether the byte-string is binary """ if data: # check the first 1024 bytes of the file firstData = data[0:max(1024, len(data))] try: firstData = bytearray(firstData) except: firstData = bytearray(firstData, encoding='utf-8') # remove all characters that are considered as text textchars = bytearray({7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x100)) - {0x7f}) isBinary = bool(firstData.translate(None, textchars)) return isBinary
[docs]def calcHashCode(filePath): if not os.path.isfile(filePath): return 0 if not os.access(filePath, os.R_OK): return 0 try: if isBinaryFile(filePath): with open(filePath, 'rb') as fp: data = fp.read() else: with open(filePath, 'r', encoding='utf-8') as fp: data = fp.read() data = bytes(data, 'utf-8') except Exception as es: data = '' h = hashlib.md5() h.update(data) return h.hexdigest()
[docs]def fetchUrl(url, data=None, headers=None, timeout=2.0, decodeResponse=True): """Fetch url request from the server """ from ccpn.util.Url import fetchHttpResponse from ccpn.util.UserPreferences import UserPreferences try: _userPreferences = UserPreferences(readPreferences=True) proxySettings = None if _userPreferences.proxyDefined: proxyNames = ['useProxy', 'proxyAddress', 'proxyPort', 'useProxyPassword', 'proxyUsername', 'proxyPassword', 'verifySSL'] proxySettings = {} for name in proxyNames: proxySettings[name] = _userPreferences._getPreferencesParameter(name) response = fetchHttpResponse('POST', url, data, headers=headers, proxySettings=proxySettings) # if response: # ll = len(response.data) # print('>>>>>>responseUpdate', proxySettings, response.data[0:min(ll, 20)]) return response.data.decode('utf-8') if decodeResponse else response except: print('Error fetching Url.')
[docs]def downloadFile(serverScript, serverDbRoot, fileName): """Download a file from the server """ # fileName = os.path.join(serverDbRoot, fileName) fileName = '/'.join([serverDbRoot, fileName]) try: values = {'fileName': fileName} response = fetchUrl(serverScript, values, decodeResponse=False) data = response.data if isBinaryData(data): result = data else: result = data.decode('utf-8') if result.startswith(BAD_DOWNLOAD): ll = len(result) bd = len(BAD_DOWNLOAD) print(str(result[min(ll, bd):min(ll, bd + 50)])) return return result except Exception as es: print('Error downloading file from server.')
[docs]def installUpdates(version, dryRun=True): updateAgent = UpdateAgent(version, dryRun=dryRun) updateAgent.resetFromServer() updateAgent.installUpdates() if updateAgent._check(): updateAgent._resetMd5()
[docs]class UpdateFile: def __init__(self, installLocation, serverDbRoot, filePath, fileServerTime=None, fileStoredAs=None, fileHashCode=None, shouldInstall=True, shouldCommit=False, isNew=False, serverDownloadScript=None, serverUploadScript=None): # self.fullFilePath = os.path.join(installLocation, filePath) self.fullFilePath = os.path.abspath(os.path.join(installLocation, filePath)) if fileServerTime: fileServerTime = float(fileServerTime) # from server it comes as a string if not fileStoredAs: fileStoredAs = PATH_SEP.join(filePath.split('/')) if not fileHashCode: fileHashCode = calcHashCode(self.fullFilePath) self.installLocation = installLocation self.serverDbRoot = serverDbRoot self.filePath = filePath self.fileServerTime = fileServerTime self.fileStoredAs = fileStoredAs self.fileHashCode = fileHashCode self.shouldInstall = shouldInstall self.shouldCommit = shouldCommit self.isNew = isNew self.serverDownloadScript = serverDownloadScript self.serverUploadScript = serverUploadScript self.fileLocalTime = lastModifiedTime(self.fullFilePath) self.fileServerDateTime = str(datetime.fromtimestamp(fileServerTime)) if fileServerTime else '' self.fileLocalDateTime = str(datetime.fromtimestamp(self.fileLocalTime)) self.fileName = os.path.basename(filePath) self.fileDir = os.path.dirname(filePath)
[docs] def installUpdate(self): """Install updated file """ data = downloadFile(self.serverDownloadScript, self.serverDbRoot, self.fileStoredAs) if not data: return fullFilePath = self.fullFilePath if os.path.isfile(fullFilePath): # backup what is there just in case shutil.copyfile(fullFilePath, fullFilePath + '__old') else: directory = os.path.dirname(fullFilePath) if not os.path.exists(directory): os.makedirs(directory) try: if isBinaryData(data): # always write binary files with open(fullFilePath, 'wb') as fp: fp.write(data) else: # backwards compatible check for half-updated - file contains DELETEHASHCODE as text if data and data.startswith(DELETEHASHCODE): os.remove(fullFilePath) else: lastHashCode = calcHashCode(fullFilePath) if os.path.isfile(fullFilePath) else '<None>' with open(fullFilePath, 'w', encoding='utf-8') as fp: fp.write(data) # generate the hashcode for the new file here currentHashCode = calcHashCode(fullFilePath) serverHashCode = self.fileHashCode _hashCodeCacheFolder = os.path.abspath(os.path.join(self.installLocation, '.cache')) _hashCodeCache = os.path.join(_hashCodeCacheFolder, '_hashCodeCache.json') if not os.path.exists(_hashCodeCache): if not os.path.exists(_hashCodeCacheFolder): os.makedirs(_hashCodeCacheFolder) data = {} else: with open(_hashCodeCache) as fp: data = json.load(fp) if currentHashCode != serverHashCode: # should only store for windows if lastHashCode in data and lastHashCode != currentHashCode: del data[lastHashCode] data[currentHashCode] = serverHashCode with open(_hashCodeCache, 'w') as fp: json.dump(data, fp, indent=4) return True except Exception as es: pass
[docs] def installDeleteUpdate(self): """Remove file as update action """ fullFilePath = self.fullFilePath try: os.remove(fullFilePath) return True except OSError: pass
# def commitUpdate(self, serverUser, serverPassword): # # uploadFile(serverUser, serverPassword, self.serverUploadScript, self.fullFilePath, self.serverDbRoot, # self.fileStoredAs) # self.fileHashCode = calcHashCode(self.fullFilePath)
[docs]class UpdateAgent(object): def __init__(self, version, showError=None, showInfo=None, askPassword=None, serverUser=None, server=SERVER, serverDbRoot=SERVER_DB_ROOT, serverDbFile=SERVER_DB_FILE, serverDownloadScript=SERVER_DOWNLOAD_SCRIPT, serverUploadScript=SERVER_UPLOAD_SCRIPT, _updateProgressHandler=None, dryRun=True): if not showError: # showError = MessageDialog.showError showError = lambda title, msg: print(msg) if not showInfo: # showInfo = MessageDialog.showInfo showInfo = lambda title, msg: print(msg) # if not askPassword: # askPassword = InputDialog.askPassword self.version = version self.showError = showError self.showInfo = showInfo self.askPassword = askPassword self.serverUser = serverUser # None for downloads, not None for uploads self.server = server self.serverDbRoot = '%s%s' % (serverDbRoot, version) self.serverDbFile = serverDbFile self.serverDownloadScript = serverDownloadScript self._serverDownloadCheckScript = SERVER_DOWNLOADCHECK_SCRIPT self.serverUploadScript = serverUploadScript # self.serverDownloadScript = '%s%s' % (server, serverDownloadScript) # self.serverUploadScript = '%s%s' % (server, serverUploadScript) self.installLocation = Path.getTopDirectory() self.updateFiles = [] self.updateFileDict = {} self._found = None self._updateProgressHandler = _updateProgressHandler self._dryRun = dryRun
[docs] def checkNumberUpdates(self): self.fetchUpdateDb() return len(self.updateFiles)
[docs] def fetchUpdateDb(self): """Fetch list of updates from server.""" self.updateFiles = updateFiles = [] self.updateFileDict = updateFileDict = {} serverDownloadScript = '%s%s' % (self.server, self.serverDownloadScript) serverUploadScript = '%s%s' % (self.server, self.serverUploadScript) data = downloadFile(serverDownloadScript, self.serverDbRoot, self.serverDbFile) if not data: return if data.startswith(BAD_DOWNLOAD): self.showError('fetching updates', f'Error: Could not download database file from server - {data}') return if data.startswith(ERROR_DOWNLOAD): self.showError('fetching updates', data) return lines = data.split('\n') if lines: version = lines[0].strip() if version != self.version: self.showError('fetching updates', 'Error: Server database version => %s != %s' % (version, self.version)) return for line in lines[1:]: line = line.rstrip() if line: (filePath, fileTime, fileStoredAs, fileHashCode) = line.split(FIELD_SEP) if fileHashCode == DELETEHASHCODE: # delete file if os.path.exists(os.path.join(self.installLocation, filePath)): # if still exists then need to add to update list updateFile = UpdateFile(self.installLocation, self.serverDbRoot, filePath, fileTime, fileStoredAs, fileHashCode, serverDownloadScript=serverDownloadScript, serverUploadScript=serverUploadScript) updateFiles.append(updateFile) updateFileDict[filePath] = updateFile elif self.serverUser or self.isUpdateDifferent(filePath, fileHashCode): # file exists, is modified and needs updating updateFile = UpdateFile(self.installLocation, self.serverDbRoot, filePath, fileTime, fileStoredAs, fileHashCode, serverDownloadScript=serverDownloadScript, serverUploadScript=serverUploadScript) updateFiles.append(updateFile) updateFileDict[filePath] = updateFile elif fileTime in [0, '0', '0.0']: # file exists, is modified and needs updating updateFile = UpdateFile(self.installLocation, self.serverDbRoot, filePath, fileTime, fileStoredAs, fileHashCode, serverDownloadScript=serverDownloadScript, serverUploadScript=serverUploadScript) updateFiles.append(updateFile) updateFileDict[filePath] = updateFile
[docs] def isUpdateDifferent(self, filePath, fileHashCode): """See if local file is different from server file.""" currentFilePath = os.path.join(self.installLocation, filePath) if os.path.exists(currentFilePath): currentHashCode = calcHashCode(currentFilePath) # get the translated hashcode from the json file in the .cache folder try: _hashCodeCacheFolder = os.path.abspath(os.path.join(self.installLocation, '.cache')) _hashCodeCache = os.path.join(_hashCodeCacheFolder, '_hashCodeCache.json') if os.path.exists(_hashCodeCache): with open(_hashCodeCache) as fp: data = json.load(fp) else: data = {} except Exception as es: data = {} if currentHashCode in data: currentHashCode = data[currentHashCode] isDifferent = (currentHashCode != fileHashCode) # below means that updates in new directories will be missed elif os.path.exists(os.path.dirname(currentFilePath)): isDifferent = True else: # NOTE:ED - was originally False to stop new directories being created, but needed now isDifferent = True return isDifferent
def _check(self): """Check the checkSum from the gui """ try: self._checkMd5() except: pass finally: return self._found is not None and 'valid' not in self._found def _checkMd5(self): """Check the checkSum status on the server """ serverDownloadScript = '%s%s' % (self.server, self._serverDownloadCheckScript) try: self._numAdditionalUpdates = 0 self._found = 'invalid' from ccpn.util.Register import userAttributes, loadDict, _otherAttributes, _insertRegistration registrationDict = loadDict() if not registrationDict: self.showError('Update error', 'Could not read registration details') return val2 = _insertRegistration(registrationDict) values = {} for attr in userAttributes + _otherAttributes: if attr in registrationDict: values[attr] = ''.join([c if 32 <= ord(c) < 128 else '_' for c in registrationDict[attr]]) values.update(val2) values['version'] = self.version self._found = fetchUrl(serverDownloadScript, values, timeout=2.0, decodeResponse=True) if isinstance(self._found, str): # file returns with EOF chars on the end self._found = self._found.rstrip('\r\n') except: self.showError('Update error', 'Could not check details on server.') def _resetMd5(self): # only write the file if it is non-empty if self._found: # from ccpn.util.UserPreferences import userPreferencesDirectory, ccpnConfigPath from ccpn.framework.PathsAndUrls import userPreferencesDirectory, ccpnConfigPath fname = ''.join([c for c in map(chr, (108, 105, 99, 101, 110, 99, 101, 75, 101, 121, 46, 116, 120, 116))]) lfile = os.path.join(userPreferencesDirectory, fname) if not os.path.exists(lfile): lfile = os.path.join(ccpnConfigPath, fname) msg = ''.join([c for c in map(chr, (117, 112, 100, 97, 116, 105, 110, 103, 32, 108, 105, 99, 101, 110, 99, 101))]) self.showInfo('installing', msg) with open(lfile, 'w', encoding='UTF-8') as fp: fp.write(self._found)
[docs] def resetFromServer(self): try: self.fetchUpdateDb() except Exception as e: self.showError('Update error', 'Could not fetch updates: %s' % e)
[docs] def addFiles(self, filePaths): serverDownloadScript = '%s%s' % (self.server, self.serverDownloadScript) serverUploadScript = '%s%s' % (self.server, self.serverUploadScript) installLocation = self.installLocation installErrorCount = 0 existsErrorCount = 0 for filePath in filePaths: if filePath.startswith(installLocation): filePath = filePath[len(installLocation) + 1:] if filePath in self.updateFileDict: updateFile = self.updateFileDict[filePath] updateFile.shouldCommit = True self.showInfo('Add Files', 'File %s already in updates' % filePath) existsErrorCount += 1 else: updateFile = UpdateFile(self.installLocation, self.serverDbRoot, filePath, shouldCommit=True, isNew=True, serverDownloadScript=serverDownloadScript, serverUploadScript=serverUploadScript) self.updateFiles.append(updateFile) self.updateFileDict[filePath] = updateFile else: self.showInfo('Ignoring Files', 'Ignoring "%s", not on installation path "%s"' % (filePath, installLocation)) installErrorCount += 1 if installErrorCount > 0: self.showError('Add file error', '%d file%s not added because not on installation path %s' % ( installErrorCount, installErrorCount > 1 and 's' or '', installLocation)) if existsErrorCount > 0: self.showError('Add file error', '%d file%s not added because already in update list (but now selected for committal)' % ( existsErrorCount, existsErrorCount > 1 and 's' or ''))
[docs] def haveWriteAccess(self): """See if can write files to local installation.""" testFile = os.path.join(self.installLocation, '__write_test__') try: with open(testFile, 'w'): pass os.remove(testFile) return True except: return False
[docs] def installChosen(self): """Download chosen server files to local installation.""" updateFiles = [updateFile for updateFile in self.updateFiles if updateFile.shouldInstall] if not updateFiles: self.showError('No updates', 'No updates for installation') return n = 0 updateFilesInstalled = [] if self.haveWriteAccess(): # # check that the last file to be updated is the Version.py # _allowVersionUpdate = True if (len(updateFiles) == 1 and updateFiles[0].filePath == VERSION_UPDATE_FILE) else False # go through the list is updates and apply each for updateFile in updateFiles: if self._updateProgressHandler: self._updateProgressHandler() # skip the version update file if updateFile.filePath == VERSION_UPDATE_FILE: continue # apply the update n = self._updateSingleFile(n, updateFile, updateFilesInstalled) # check how many have been updated correctly ss = n != 1 and 's' or '' if n != len(updateFiles): notInstalled = list(set(updateFilesInstalled) ^ set(updateFiles)) # check if only the version update file remains if notInstalled and len(notInstalled) == 1 and notInstalled[0].filePath == VERSION_UPDATE_FILE: n = self._updateSingleFile(n, notInstalled[0], updateFilesInstalled) # check whether the version update file installed correctly ss = n != 1 and 's' or '' if n == len(updateFiles): self.showInfo('Update%s installed' % ss, '%d update%s installed successfully' % (n, ss)) else: self.showError('Update problem', '%d update%s installed, %d not installed, see console for error messages' % (n, ss, len(updateFiles) - n)) else: self.showError('Update problem', '%d update%s installed, %d not installed, see console for error messages' % (n, ss, len(updateFiles) - n)) else: self.showInfo('Update%s installed' % ss, '%d update%s installed successfully' % (n, ss)) else: self.showError('No write permission', 'You do not have write permission in the CCPN installation directory') self.resetFromServer() return updateFilesInstalled
def _updateSingleFile(self, n, updateFile, updateFilesInstalled): try: if not self._dryRun: if updateFile.fileHashCode == DELETEHASHCODE: self.showInfo('Install Updates', 'Removing %s' % (updateFile.fullFilePath)) if not updateFile.installDeleteUpdate(): raise RuntimeError("error deleting original file") else: self.showInfo('Install Updates', 'Installing %s' % (updateFile.fullFilePath)) if not updateFile.installUpdate(): raise RuntimeError("error installing update") else: if updateFile.fileHashCode == DELETEHASHCODE: self.showInfo('Install Updates', 'dry-run Removing %s' % (updateFile.fullFilePath)) else: self.showInfo('Install Updates', 'dry-run Installing %s' % (updateFile.fullFilePath)) n += 1 updateFilesInstalled.append(updateFile) except Exception as e: self.showError('Install Error', 'Could not install %s: %s' % (updateFile.fullFilePath, e)) return n
[docs] def installUpdates(self): for updateFile in self.updateFiles: updateFile.shouldInstall = True return self.installChosen()
[docs] def diffUpdates(self, updateFiles=None, write=sys.stdout.write): if updateFiles is None: updateFiles = [] serverDownloadScript = '%s%s' % (self.server, self.serverDownloadScript) for updateFile in updateFiles: fullFilePath = updateFile.fullFilePath write(60 * '*' + '\n') write('Diff for %s\n' % fullFilePath) if os.path.exists(fullFilePath): if updateFile.isNew: write('No server copy of file\n') else: haveDiff = False with open(fullFilePath, 'rU', encoding='utf-8') as fp: localLines = fp.readlines() serverData = downloadFile(serverDownloadScript, self.serverDbRoot, updateFile.fileStoredAs) if serverData: serverLines = serverData.splitlines(True) for line in difflib.context_diff(localLines, serverLines, fromfile='local', tofile='server'): haveDiff = True write(line) if haveDiff: write('\n') else: write('No diff\n') else: write('No file on server') else: write('No local copy of file\n')
[docs]def testMain(): from ccpn.framework.Version import applicationVersion import sys import os # installUpdates(applicationVersion.withoutRelease(), dryRun=False) installUpdates(applicationVersion, dryRun=False) if sys.platform[:3].lower() == 'win': os._exit(0)
if __name__ == '__main__': testMain()