"""Module Documentation here
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
"Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See https://ccpn.ac.uk/software/licensing/")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
"CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
"J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Geerten Vuister $"
__dateModified__ = "$dateModified: 2022-02-07 17:13:53 +0000 (Mon, February 07, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2017-04-07 10:28:41 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================
#===========================================================================================================
# Decorators
#===========================================================================================================
import sys
import os
import linecache
import functools
import cProfile
import decorator
import inspect
import time
from functools import partial
from ccpn.util.SafeFilename import getSafeFilename
from ccpn.util.Path import aPath, Path
import ccpn.util.Logging as Logging
[docs]def trace(f):
def globaltrace(frame, why, arg):
if why == "call":
return localtrace
return None
def localtrace(frame, why, arg):
if why == "line":
# record the file name and line number of every trace
filename = frame.f_code.co_filename
lineno = frame.f_lineno
bname = os.path.basename(filename)
sys.stderr.write("{}({}): {}".format(bname,
lineno,
linecache.getline(filename, lineno)),
)
return localtrace
def _f(*args, **kwds):
sys.settrace(globaltrace)
result = f(*args, **kwds)
sys.settrace(None)
return result
return _f
[docs]def singleton(cls):
""" Use class as singleton.
From: https://wiki.python.org/moin/PythonDecoratorLibrary#Singleton
Annotated by GWV
Modified by EJB to keep record of all singletons
- these can later be destroyed, was causing leakage between running testcases
"""
@functools.wraps(cls.__new__)
def singleton_new(cls, *args, **kwds):
# keep a list of already created singletons
if not hasattr(singleton, '_instances'):
singleton._instances = {}
# check if it already exists
it = singleton._instances.get(cls)
if it is not None:
return it
# it did not yet exist; generate an instance, store in list
singleton._instances[cls] = it = cls.__new_original__(cls, *args, **kwds)
it.__init_original__(*args, **kwds)
return it
# keep the new method and replace by singleton_new
cls.__new_original__ = cls.__new__
cls.__new__ = singleton_new
# keep the init method and replace by the object init
cls.__init_original__ = cls.__init__
cls.__init__ = object.__init__
return cls
[docs]def pstatToText(pstatPath, outPath=None):
"""
Converts a profile file of type .pstat into a plain text file.
:param pstatPath: path of the pstat file. (The output of the decorator "profile")
:param outPath: output pat. Including the file name and extension ".txt".
If None, saves in the same location of pstat with same original name
:return: the stats object for the file
"""
import pstats
import io
s = io.StringIO()
ps = pstats.Stats(pstatPath, stream=s).sort_stats('tottime')
ps.print_stats()
if not outPath:
outPath = pstatPath.replace('pstat', 'txt')
with open(outPath, 'w+') as f:
f.write(s.getvalue())
return ps
[docs]def profile(dirPath='~', asText=False):
"""
Get the stats of all related calls firing from inside a specific function/method.
Add on top of a function/method to profile it. E.g.:
@profile(dirPath='/myDesktopPath/')
def my function(*args): ...
:param dirPath: str, dir where to dump the pstat file.
:param asText: bool. Make a pstat copy as a human readable text file.
"""
def _profile(func):
@functools.wraps(func)
def profileWrapper(*args, **kwargs):
profiler = cProfile.Profile()
try:
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
return result
finally:
filename = aPath(dirPath).joinpath(func.__name__ + '.pstat')
filename = getSafeFilename(filename, 'w')
profiler.dump_stats(filename)
if asText:
pstatToText(str(filename))
return profileWrapper
return _profile
[docs]def notify(trigger, preExecution=False):
"""A decorator wrap a method around a notification blanking with explicit notification pre- or post-execution
"""
trigger = 'change' if trigger == 'observe' else trigger
@decorator.decorator
def theDecorator(*args, **kwds):
func = args[0]
args = args[1:] # Optional 'self' is now args[0]
self = args[0]
project = self.project # we need a reference now, as the func could be deleting the obj
if preExecution:
# call the notification
self._finaliseAction(trigger)
# Execute the function with blanked notification
project.blankNotification()
result = func(*args, **kwds)
project.unblankNotification()
if not preExecution:
# call the notification
self._finaliseAction(trigger)
return result
return theDecorator
[docs]def propertyUndo():
"""A decorator to wrap a method in an undo block
Requires that the 'self' has 'project' as an attribute
"""
from ccpn.core.lib.ContextManagers import undoBlock
@decorator.decorator
def theDecorator(*args, **kwds):
func = args[0]
args = args[1:] # Optional 'self' is now args[0]
self = args[0]
_undo = self.project._undo
with undoBlock():
# Execute the function while blocking all additions to the call undo stack
_undo.increaseBlocking()
# remember the old value, requires a property getter
oldValue = getattr(self, func.__name__)
# call the wrapped function
result = func(*args, **kwds)
_undo.decreaseBlocking()
# add the wrapped function to the undo stack
_undo._newItem(undoPartial=partial(func, self, oldValue),
redoPartial=partial(func, *args, **kwds))
return result
return theDecorator
[docs]def callList(func):
"""
Decorator to give the realtime call stack for the decorated function.
Adds _callList=None, _callStr=None to the parameter list for the function call
so function can access full list.
_callList is tuple of tuples of the form:
((caller info, simple print string), string)
caller info is: (index, name of calling method, stack info)
simple print string is repr if caller info.
The function will need either _callList=None, or **kwds adding to the parameter list.
# Not fully tested
"""
def inner(*args, **kwargs):
stack = inspect.stack()
minStack = len(stack) # min(stack_size, len(stack))
modules = [(index, inspect.getmodule(stack[index][0]))
for index in range(1, minStack)]
callers = [(0, func.__module__, func.__name__)]
for index, module in modules:
try:
name = module.__name__
except:
name = '<NOT_FOUND>'
callers.append((index, name, stack[index][3]))
s = '{index:>5} : {module:^%i} : {name}' % 20
printStr = []
for i in range(0, len(callers)):
printStr.append(s.format(index=callers[i][0], module=callers[i][1], name=callers[i][2]))
kwargs['_callList'] = tuple((cc, pp) for cc, pp in zip(callers, printStr))
return func(*args, **kwargs)
return inner
#----------------------------------------------------------------------------------------------
# Adapted from from sandbox.Geerten.Refactored.decorators to fit current setup
#----------------------------------------------------------------------------------------------
def _obj2pid(obj):
"""
Convert any core objects and CcpnModules to pids;
expand list, tuples, dicts but don't use recursion
Convert Path to str
CCPNINTERNAL: also used in logCommandManager contextmanager
"""
# local import to prevent circular import
from ccpn.core._implementation.AbstractWrapperObject import AbstractWrapperObject
from ccpn.ui.gui.modules.CcpnModule import CcpnModule
if isinstance(obj, (AbstractWrapperObject, CcpnModule)):
obj = obj.pid
elif isinstance(obj, list):
_tmp = []
for itm in obj:
if isinstance(itm, (AbstractWrapperObject, CcpnModule)):
_tmp.append(itm.pid)
else:
_tmp.append(itm)
obj = _tmp
elif isinstance(obj, tuple):
_tmp = []
for itm in obj:
if isinstance(itm, (AbstractWrapperObject, CcpnModule)):
_tmp.append(itm.pid)
else:
_tmp.append(itm)
obj = tuple(_tmp)
elif isinstance(obj, dict):
_tmp = {}
for key, value in obj.items():
if isinstance(value, (AbstractWrapperObject, CcpnModule)):
_tmp[key] = value.pid
else:
_tmp[key] = value
obj = _tmp
elif isinstance(obj, Path):
obj = str(obj)
return obj
def _makeLogString(prefix, addSelf, func, *args, **kwds):
"""Helper function to create the log string from func, args and kwds
returns string:
if addSelf == False:
prefix+func.__name__(EXPANDED-ARGUMENTS)
if addSelf == True
prefix+CLASSNAME-of-SELF+'.'+func.__name__(EXPANDED-ARGUMENTS)
"""
# get the signature
sig = inspect.signature(func)
# fill in the missing parameters
ba = sig.bind(*args, **kwds)
ba.apply_defaults()
# get the parameters kinds that determine how to print them
kinds = dict([(pName, p.kind) for pName, p in sig.parameters.items()])
if 'self' in ba.arguments or 'cls' in ba.arguments:
# we skip the first 'self' or 'cls' in the argument list
pNames = list(ba.arguments.keys())[1:]
else:
pNames = list(ba.arguments.keys())
# make a string for each parameter
pStrings = []
for pName in pNames:
pValue = ba.arguments[pName]
if kinds[pName] == inspect.Parameter.VAR_POSITIONAL: # variable arguments
pStrings.extend([repr(_obj2pid(p)) for p in pValue])
elif kinds[pName] == inspect.Parameter.VAR_KEYWORD: # variable keywords
pStrings.extend(['{0!s}={1!r}'.format(k, _obj2pid(v)) for (k, v) in pValue.items()])
elif kinds[pName] == inspect.Parameter.POSITIONAL_ONLY:
pStrings.append(repr(_obj2pid(pValue)))
elif kinds[pName] == inspect.Parameter.KEYWORD_ONLY or \
kinds[pName] == inspect.Parameter.POSITIONAL_OR_KEYWORD: # # keywords or positional keywords
pStrings.append('{0!s}={1!r}'.format(pName, _obj2pid(pValue)))
if ('self' in ba.arguments or 'cls' in ba.arguments) and addSelf:
logString = prefix + '%s.%s' % (args[0].__class__.__name__, func.__name__)
else:
logString = prefix + '%s' % (func.__name__,)
logString += '(%s)' % ', '.join(pStrings)
return logString
[docs]def quickCache(func):
"""Class to implement a quick caching decorator
For speed, only the first argument of the wrapped function is taken as the key
"""
cache = {}
def _cacheFunc(*args, **kwds):
try:
return cache[args[0]]
except:
cache[args[0]] = result = func(*args, **kwds)
return result
def cacheClearItem(item):
if item in cache:
del cache[item]
# attach external methods to _cacheFunc
# must be done like this, as internal functions are only created at runtime
_cacheFunc.cacheClear = lambda: cache.clear()
_cacheFunc.cachePrint = lambda: print(f'>>> {cache}')
_cacheFunc.cacheClearItem = cacheClearItem
return _cacheFunc
@quickCache
def _inspectFunc(func):
"""Function to return the module.function:lineNo of the wrapped function
"""
# this is cached to speed up the get_ methods (cache may have to be cleared if modules are reloaded)
# but can be cleared with _inspectFunc.cacheClear()
_, _line = inspect.getsourcelines(func)
_file = aPath(inspect.getsourcefile(func)).basename
return f'({_file}.{func.__name__}:{_line + 1})'
[docs]def logCommand(prefix='', get=None, isProperty=False):
"""A decorator to log the invocation of the call to a Framework, Project, ... method.
Use prefix to set the proper command context, e.g. 'application.' or 'project.'
Use isProperty to get ' = 'args[1]
"""
from ccpn.core.lib.ContextManagers import notificationEchoBlocking # local to prevent circular imports
@decorator.decorator
def theDecorator(*args, **kwds):
# to avoid potential conflicts with potential 'func' named keywords
func = args[0]
args = args[1:] # Optional 'self' is now args[0]
self = args[0]
# application = self.project.application
# GWV: tried this for application.newProject decoration, but unsuccessful (for now)
from ccpn.framework.Application import getApplication
application = getApplication()
if application is None:
raise RuntimeError('Error getting application')
blocking = application._echoBlocking
if blocking == 0 and application.ui is not None:
_pref = prefix
if get == 'self':
_pref += "get('%s')." % args[0].pid
if isProperty:
try:
# if arg is a core object then use get(pid)
logS = _pref + "%s = get('%s')" % (func.__name__, args[1].pid)
except:
logS = _pref + '%s = %r' % (func.__name__, args[1])
else:
logS = _makeLogString(_pref, False, func, *args, **kwds)
# get the trace for the func method and append to log string (cached function; may need to be cleared?)
# this has been removed from the logger.formatting and moved to the logging methods
_trace = _inspectFunc(func)
msg = f'{logS:90} {_trace}'
application.ui.echoCommands([msg])
# increase blocking
with notificationEchoBlocking(application=application):
result = func(*args, **kwds)
return result
return theDecorator
[docs]def timeDecorator(method):
"""calculate execution time of a function/method
"""
def timed(*args, **kw):
ts = time.time()
result = method(*args, **kw)
te = time.time()
if 'log_time' in kw:
name = kw.get('log_name', method.__name__.upper())
kw['log_time'][name] = int((te - ts) * 1000)
else:
print('%r %2.2f ms' % (method.__name__, (te - ts) * 1000))
return result
return timed
[docs]def timeitDecorator(method):
"""calculate execution time of a function/method
"""
def timed(*args, **kwds):
ts = time.time()
result = method(*args, **kwds)
te = time.time()
final = te - ts
m = 'Execution time for %r: %.3f ms'
print(m % (method.__name__, final))
return result
return timed
[docs]def debugEnter(verbosityLevel=Logging.DEBUG1):
"""A decorator to log the invocation of the call
"""
@decorator.decorator
def decoratedFunc(*args, **kwds):
# def debugEnter(func, *args, **kwds):
# to avoid potential conflicts with potential 'func' named keywords
func = args[0]
args = args[1:]
logs = _makeLogString('ENTERING: ', True, func, *args, **kwds)
# get a logger and call the correct routine depending on verbosityLevel
logger = Logging.getLogger()
if verbosityLevel == Logging.DEBUG1:
logger.debug(logs)
elif verbosityLevel == Logging.DEBUG2:
logger.debug2(logs)
elif verbosityLevel == Logging.DEBUG3:
logger.debug3(logs)
else:
raise ValueError('invalid verbosityLevel "%s"' % verbosityLevel)
# execute the function and return the result
return func(*args, **kwds)
return decoratedFunc
[docs]def debug1Enter():
"""Convenience"""
return debugEnter(verbosityLevel=Logging.DEBUG1)
[docs]def debug2Enter():
"""Convenience"""
return debugEnter(verbosityLevel=Logging.DEBUG2)
[docs]def debug3Enter():
"""Convenience"""
return debugEnter(verbosityLevel=Logging.DEBUG3)
[docs]def debugLeave(verbosityLevel=Logging.DEBUG1):
"""A decorator to log the invocation of the call
"""
@decorator.decorator
def decoratedFunc(*args, **kwds):
# def debugLeave(func, *args, **kwds):
# to avoid potential conflicts with potential 'func' named keywords
func = args[0]
args = args[1:]
ba = inspect.signature(func).bind(*args, **kwds)
ba.apply_defaults()
allArgs = ba.arguments
#execute the function
result = func(*args, **kwds)
if 'self' in allArgs or 'cls' in allArgs:
logs = 'LEAVING: %s.%s(); result=%r' % \
(args[0].__class__.__name__, func.__name__, result)
else:
logs = 'LEAVING: %s(); result=%r' % (func.__name__, result)
# get a logger and call the correct routine depending on verbosityLevel
logger = Logging.getLogger()
if verbosityLevel == Logging.DEBUG1:
logger.debug(logs)
elif verbosityLevel == Logging.DEBUG2:
logger.debug2(logs)
elif verbosityLevel == Logging.DEBUG3:
logger.debug3(logs)
else:
raise ValueError('invalid verbosityLevel "%s"' % verbosityLevel)
#return the function result
return result
return decoratedFunc
[docs]def debug1Leave():
"""Convenience"""
return debugLeave(verbosityLevel=Logging.DEBUG1)
[docs]def debug2Leave():
"""Convenience"""
return debugLeave(verbosityLevel=Logging.DEBUG2)
[docs]def debug3Leave():
"""Convenience"""
return debugLeave(verbosityLevel=Logging.DEBUG3)
#==========================================================================================================================
# testing
#==========================================================================================================================
if __name__ == '__main__':
def func(par, *args, flag=False, **kwds):
sig = inspect.signature(func) # get the signature
ba = sig.bind(par, *args, flag=flag, **kwds)
ba.apply_defaults() # fill in the missing parameters
kinds = dict([(pName, p.kind) for pName, p in sig.parameters.items()]) # get the parameters kinds that determine
# how to print them
pStrings = []
for pName, pValue in ba.arguments.items():
if kinds[pName] == inspect.Parameter.VAR_POSITIONAL: # variable argument
pStrings.extend([repr(p) for p in pValue])
elif kinds[pName] == inspect.Parameter.VAR_KEYWORD: # variable keywords
pStrings.extend(['{0!s}={1!r}'.format(k, v) for (k, v) in pValue.items()])
elif kinds[pName] == inspect.Parameter.POSITIONAL_ONLY or \
kinds[pName] == inspect.Parameter.POSITIONAL_OR_KEYWORD: # positional keywords
pStrings.append(repr(pValue))
elif kinds[pName] == inspect.Parameter.KEYWORD_ONLY: # keywords
pStrings.append('{0!s}={1!r}'.format(pName, pValue))
print(', '.join(pStrings))
func('test', 1, 2, myPar='myValue')