Source code for ccpn.core.lib.Undo

"""General undo handle supporting undo/redo stack

PyApiGen.py inserts the following line:

from ccpn.core.lib.Undo import _deleteAllApiObjects, restoreOriginalLinks, no_op

"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (http://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2022-01-13 17:30:48 +0000 (Thu, January 13, 2022) $"
__version__ = "$Revision: 3.0.4 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: CCPN $"
__date__ = "$Date: 2017-04-07 10:28:41 +0000 (Fri, April 07, 2017) $"
#=========================================================================================
# Start of code
#=========================================================================================

import sys
from enum import Enum
from functools import partial, update_wrapper
from collections import deque

from ccpn.util.Logging import getLogger
from ccpn.util.OrderedSet import OrderedSet


MAXUNDOWAYPOINTS = 50
MAXUNDOOPERATIONS = 10000


def _deleteAllApiObjects(objsToBeDeleted):
    """Delete all API objects in collection, together.
    Does NOT look for additional deletes or do any checks. Programmer beware!!!
    Does NOT do undo handling, as it is designed to be used within the Undo machinery
    """

    # CCPNINTERNAL
    # NBNB Use with EXTREME CARE, and make sure you get ALL API objects being created

    for obj in objsToBeDeleted:
        if (obj.__dict__.get('isDeleted')):
            raise ValueError("""%s: _deleteAllApiObjects
       called on deleted object""" % obj.qualifiedName
                             )

    for obj in objsToBeDeleted:
        for notify in obj.__class__._notifies.get('preDelete', ()):
            notify(obj)

    for obj in objsToBeDeleted:
        # objsToBeDeleted is passed in so that the references to the children of object are severed
        obj._singleDelete(objsToBeDeleted)

    # do Notifiers
    for obj in objsToBeDeleted:
        for notify in obj.__class__._notifies.get('delete', ()):
            notify(obj)





[docs]def no_op(): """Does nothing - for special undo situations where only one direction must act""" return
[docs]def resetUndo(memopsRoot, maxWaypoints=MAXUNDOWAYPOINTS, maxOperations=MAXUNDOOPERATIONS, debug: bool = False, application=None): """Set or reset undo stack, using passed-in parameters. NB setting either parameter to 0 removes the undo stack.""" undo = memopsRoot._undo if undo is not None: undo.clear() if maxWaypoints and maxOperations: memopsRoot._undo = Undo(maxWaypoints=maxWaypoints, maxOperations=maxOperations, debug=debug, application=application) else: memopsRoot._undo = None
[docs]class UndoEvents(Enum): UNDO_UNDO = 1 UNDO_REDO = 2 UNDO_CLEAR = 3 UNDO_ADD = 4 UNDO_MARK_SAVE = 5 UNDO_MARK_CLEAN = 6
[docs]class UndoObserver(): """ Class to store functions to call when undo stack operations are performed """ def __init__(self): self._callbacks = OrderedSet()
[docs] def add(self, callback): self._callbacks.add(callback)
[docs] def clear(self): self._callbacks.clear()
[docs] def remove(self, callback): if callback in self._callbacks: self._callbacks.remove(callback)
[docs] def call(self, action): for callback in self._callbacks: action(callback)
[docs]class Undo(deque): """Implementation of an undo and redo stack, with possibility of waypoints. A waypoint is the level at which an undo happens, and each of them could consist of multiple individual undo operations. To create a waypoint use newWaypoint(). """ # TODO: get rid of debug and use logging function instead def __init__(self, maxWaypoints=MAXUNDOWAYPOINTS, maxOperations=MAXUNDOOPERATIONS, debug=False, application=None): """Create Undo object with maximum stack length maxUndoCount""" self.maxWaypoints = maxWaypoints self.maxOperations = maxOperations self.nextIndex = 0 # points to next free slot (or first slot to redo) self.waypoints = [] # array of last item in each waypoint self._blocked = False # Block/unblock switch - internal use only self._undoItemBlockingLevel = 0 # Blocking level - modify with increase/decreaseBlocking only self._waypointBlockingLevel = 0 # Waypoint blocking - modify with increase/decreaseWaypointBlocking/ only self._newItemCount = 0 # the number of new items that have been added since the last new waypoint self._itemAtLastSave = None self._lastEventMarkClean = True self.undoChanged = UndoObserver() if maxWaypoints: self.newWaypoint() # DO NOT CHANGE THIS ONE deque.__init__(self) # Set to True to unblank errors during undo/redo self._debug = debug self.application = application @property def undoItemBlocking(self): """Undo blocking. If true (non-zero) undo setting is blocked. Allows multiple external functions to set blocking without trampling each other Modify with increaseBlocking/decreaseBlocking only""" return self._undoItemBlockingLevel > 0 @property def undoItemBlockingLevel(self): """Undo blocking Level. If true (non-zero) undo setting is blocked. Allows multiple external functions to set blocking without trampling each other Modify with increaseBlocking/decreaseBlocking only""" # needed for a single occurrence in api return self._undoItemBlockingLevel
[docs] def markSave(self): if len(self) > 0: lastItem = self.nextIndex - 1 try: self._itemAtLastSave = self[lastItem] except IndexError as ie: getLogger().debug('Error on markSave %s' % ie) self._lastEventMarkClean = True self.undoChanged.call(lambda x: x(UndoEvents.UNDO_MARK_SAVE))
[docs] def isDirty(self): result = False lastItem = self.nextIndex - 1 try: if len(self) > 0 and (self._itemAtLastSave == None): result = True elif len(self) > 0 and lastItem > 0 and (self[lastItem] != self._itemAtLastSave): result = True except IndexError as ie: getLogger().debug('Error checking isDirty %s' % ie) return result
[docs] def markClean(self): self._itemAtLastSave = None self._lastEventMarkClean = True self.undoChanged.call(lambda x: x(UndoEvents.UNDO_MARK_CLEAN))
[docs] def markUndoClear(self): self._itemAtLastSave = None self._lastEventMarkClean = True self.undoChanged.call(lambda x: x(UndoEvents.UNDO_CLEAR))
[docs] def increaseBlocking(self): """Set one more level of blocking""" self._undoItemBlockingLevel += 1
[docs] def decreaseBlocking(self): """Reduce level of blocking - when level reaches zero, undo is unblocked""" if self._undoItemBlockingLevel > 0: self._undoItemBlockingLevel -= 1
@property def undoList(self): try: undoState = (self.maxWaypoints, self.maxOperations, self.nextIndex, self.waypoints, self._blocked, self.undoItemBlocking, len(self), self._newItemCount, self[-1], [(undoFunc[0].__name__, undoFunc[1].__name__) for undoFunc in self], [undoFunc[0].__name__ for undoFunc in self], [undoFunc[1].__name__ for undoFunc in self]) except: undoState = (self.maxWaypoints, self.maxOperations, self.nextIndex, self.waypoints, self._blocked, self.undoItemBlocking, len(self), self._newItemCount, None, None, None, None) return undoState @property def waypointBlocking(self): """Undo blocking. If true (non-zero) undo setting is blocked. Allows multiple external functions to set blocking without trampling each other Modify with increaseBlocking/decreaseBlocking only""" return self._waypointBlockingLevel > 0
[docs] def increaseWaypointBlocking(self): """Set one more level of blocking""" self._waypointBlockingLevel += 1
[docs] def decreaseWaypointBlocking(self): """Reduce level of blocking - when level reaches zero, undo is unblocked""" if self.waypointBlocking: self._waypointBlockingLevel -= 1
[docs] def newWaypoint(self): """Start new waypoint """ if self.maxWaypoints < 1: raise ValueError("Attempt to set waypoint on Undo object that does not allow them") waypoints = self.waypoints if self._blocked or self._undoItemBlockingLevel or self.waypointBlocking: # ejb - added self._blocked 9/6/17 return # set the number of items added to the undo deque since the new waypoint was created self._newItemCount = 0 if self.nextIndex < 1: return if waypoints and waypoints[-1] == self.nextIndex - 1: # don't need to add a new waypoint return # if is the same as the last one waypoints.append(self.nextIndex - 1) # add the new waypoint to the end # if the list is too big then cull the first item if len(waypoints) > self.maxWaypoints: nRemove = waypoints[0] self.nextIndex -= nRemove for ii in range(nRemove): _popLeftItem = self.popleft() del waypoints[0] for ii, junk in enumerate(waypoints): waypoints[ii] -= nRemove
def _wrappedPartial(self, func, *args, **kwargs): partial_func = partial(func, *args, **kwargs) update_wrapper(partial_func, func) return partial_func def _newItem(self, undoPartial=None, redoPartial=None): """Add predefined partial(*) item to the undo stack. """ if self._blocked or self._undoItemBlockingLevel: return if self._debug: getLogger().debug('undo._newItem %s %s %s' % (self.undoItemBlocking, undoPartial, redoPartial)) # clear out redos that are no longer going to be doable for n in range(len(self) - self.nextIndex): self.pop() # add new undo/redo methods to the deque - keep a count self.append((undoPartial, redoPartial)) self._newItemCount += 1 # fix waypoints: ll = self.waypoints _waypoints = [ii for ii, wp in enumerate(ll) if wp == ll[-1]] if _waypoints: if len(_waypoints) > 2: raise RuntimeError('waypoint length error') # need to back-track to the previous value if duplicated ll[:] = ll[:_waypoints[0] + 1] while ll and ll[-1] >= self.nextIndex: ll.pop() # correct for maxOperations if len(self) > self.maxOperations: self.popleft() ll = self.waypoints if ll: for n, val in enumerate(ll): ll[n] = val - 1 if ll[0] < 0: del ll[0] else: self.nextIndex += 1 if self._lastEventMarkClean: # NOTE:ED - only do it for the first new item? self._lastEventMarkClean = False self.undoChanged.call(lambda x: x(UndoEvents.UNDO_ADD))
[docs] def newItem(self, undoMethod, redoMethod, undoArgs=None, undoKwargs=None, redoArgs=None, redoKwargs=None): """Add item to the undo stack. """ if self._blocked or self._undoItemBlockingLevel: return if self._debug: getLogger().debug('undo.newItem %s %s %s %s %s %s %s' % (self.undoItemBlocking, undoMethod, redoMethod, undoArgs, undoKwargs, redoArgs, redoKwargs)) if not undoArgs: undoArgs = () if not redoArgs: redoArgs = () # clear out redos that are no longer going to be doable for n in range(len(self) - self.nextIndex): self.pop() # add new data if undoKwargs is None: undoCall = self._wrappedPartial(undoMethod, *undoArgs) else: undoCall = self._wrappedPartial(undoMethod, *undoArgs, **undoKwargs) if redoKwargs is None: redoCall = self._wrappedPartial(redoMethod, *redoArgs) else: redoCall = self._wrappedPartial(redoMethod, *redoArgs, **redoKwargs) # add new undo/redo methods to the deque - keep a count newItem = (undoCall, redoCall) self.append(newItem) self._newItemCount += 1 # fix waypoints: ll = self.waypoints _waypoints = [ii for ii, wp in enumerate(ll) if wp == ll[-1]] if _waypoints: if len(_waypoints) > 2: raise RuntimeError('waypoint length error') # need to back-track to the previous value if duplicated ll[:] = ll[:_waypoints[0] + 1] while ll and ll[-1] >= self.nextIndex: ll.pop() # correct for maxOperations if len(self) > self.maxOperations: self.popleft() ll = self.waypoints if ll: for n, val in enumerate(ll): ll[n] = val - 1 if ll[0] < 0: del ll[0] else: self.nextIndex += 1 #GST hack to get round bug #GST when extra #badKeys = ('includePositiveContours', 'includeNegativeContours', 'spectrumAliasing') #badKeys = tuple(sorted(badKeys)) #testKeys = undoArgs[0].keys() #testKeys = tuple(sorted(testKeys)) if self._lastEventMarkClean: #and testKeys != badKeys: # NOTE:ED - only do it for the first new item? self._lastEventMarkClean = False self.undoChanged.call(lambda x: x(UndoEvents.UNDO_ADD))
[docs] def undo(self): """Undo one operation - or one waypoint if waypoints are set For now errors are handled by printing a warning and clearing the undo object """ # TBD: what should we do if undoMethod() throws an exception? if self.nextIndex == 0: return elif self.maxWaypoints: undoTo = -1 for val in self.waypoints: if val < self.nextIndex - 1: undoTo = val else: break else: undoTo = max(self.nextIndex - 2, -1) from ccpn.core.lib.ContextManagers import undoBlock if self.application and self.application._disableUndoException: # mode is activated with switch --disable-undo-exception # block addition of items while operating self._blocked = True with undoBlock(): undoCall = redoCall = None for n in range(self.nextIndex - 1, undoTo, -1): undoCall, redoCall = self[n] if undoCall: undoCall() self.nextIndex = undoTo + 1 # Added by Rasmus March 2015. Surely we need to reset self._blocked? self._blocked = False else: # block addition of items while operating self._blocked = True try: with undoBlock(): undoCall = redoCall = None for n in range(self.nextIndex - 1, undoTo, -1): undoCall, redoCall = self[n] if undoCall: undoCall() self.nextIndex = undoTo + 1 except Exception as e: from ccpn.util.Logging import getLogger getLogger().warning("Error while undoing (%s). Undo stack is cleared." % e) if self.application and self.application._ccpnLogging: self._logObjects() if self._debug: sys.stderr.write("UNDO DEBUG: error in undo. Last undo function was: %s\n" % undoCall) raise self.clear() finally: # Added by Rasmus March 2015. Surely we need to reset self._blocked? self._blocked = False self.undoChanged.call(lambda x: x(UndoEvents.UNDO_UNDO))
[docs] def redo(self): """Redo one waypoint - or one operation if waypoints are not set. For now errors are handled by printing a warning and clearing the undo object """ if self.nextIndex >= len(self): return elif self.maxWaypoints: redoTo = len(self) - 1 for val in reversed(self.waypoints): if val >= self.nextIndex: redoTo = val else: break else: redoTo = min(self.nextIndex, len(self)) from ccpn.core.lib.ContextManagers import undoBlock if self.application and self.application._disableUndoException: # mode is activated with switch --disable-undo-exception # block addition of items while operating self._blocked = True with undoBlock(): for n in range(self.nextIndex, redoTo + 1): undoCall, redoCall = self[n] if redoCall: redoCall() self.nextIndex = redoTo + 1 # Added by Rasmus March 2015. Surely we need to reset self._blocked? self._blocked = False else: # block addition of items while operating self._blocked = True try: with undoBlock(): for n in range(self.nextIndex, redoTo + 1): undoCall, redoCall = self[n] if redoCall: redoCall() self.nextIndex = redoTo + 1 except Exception as e: from ccpn.util.Logging import getLogger getLogger().warning("Error while redoing (%s). Undo stack is cleared." % e) if self.application and self.application._ccpnLogging: self._logObjects() if self._debug: sys.stderr.write("REDO DEBUG: error in redo. Last redo call was: %s\n" % redoCall) raise self.clear() finally: # Added by Rasmus March 2015. Surely we need to reset self._blocked? self._blocked = False self.undoChanged.call(lambda x: x(UndoEvents.UNDO_REDO))
[docs] def clear(self): """Clear and reset undo object """ self.nextIndex = 0 self.waypoints.clear() self._blocked = False self._undoItemBlockingLevel = 0 deque.clear(self) self.markUndoClear()
[docs] def canUndo(self) -> bool: """True if an undo operation can be performed """ return self.nextIndex > 0
[docs] def canRedo(self) -> bool: """True if a redo operation can be performed """ return self.nextIndex < len(self)
[docs] def numItems(self): """Return the number of undo items currently on the undo deque """ return len(self)
@property def newItemsAdded(self): """Return the number of new items that have been added to the undo deque since the last new waypoint was created """ return self._newItemCount
[docs] def clearRedoItems(self): """Clear the items above the current next index, if there has been an error adding items """ # remove unwanted items from the top of the undo deque while len(self) > self.nextIndex: self.pop() # fix waypoints - remove any that are left beyond the new end of the undo deque: ll = self.waypoints while ll and ll[-1] >= self.nextIndex - 1: ll.pop() self._newItemCount = 0
def _logObjects(self): """Ccpn Internal - log objects under review to the logger Activated with switch --ccpn-logging """ if self.application and self.application.project: _project = self.application.project _log = getLogger().debug # list the peak info _log('peakDims ~~~~~~~~~~~') _log('\n'.join([str(pk) for pk in _project.peaks])) pks = [pk._wrappedData for pk in _project.peaks] for pk in pks: for pkDim in pk.sortedPeakDims(): _log(f'{pkDim}') for pkDimContrib in pkDim.sortedPeakDimContribs(): _log(f' {pkDimContrib} {pkDimContrib.resonance}') _log('peakContribs ~~~~~~~~~~~') for pk in pks: for pkContrib in pk.sortedPeakContribs(): _log(f' {pkContrib}') for pkDimContrib in pkContrib.sortedPeakDimContribs(): _log(f' {pkDimContrib} {pkDimContrib.resonance}') _log('shifts ~~~~~~~~~~~') _log('\n'.join([str(sh) for sh in _project.chemicalShifts])) shifts = [sh._wrappedData for sh in _project.chemicalShifts] for sh in _project.chemicalShifts: _log(f' {sh} {sh.nmrAtom}') for sh in shifts: _log(f' {sh} {sh.isDeleted} {sh.resonance}') _log('resonanceGroups ~~~~~~~~~~~') _log('\n'.join([str(res) for res in _project.nmrResidues])) ress = [res._wrappedData for res in _project.nmrResidues] for res in ress: _log(f' {res}') _log('resonances ~~~~~~~~~~~') _log('\n'.join([str(res) for res in _project.nmrAtoms])) ress = [res._wrappedData for res in _project.nmrAtoms] for res in ress: _log(f' {res}') for sh in res.sortedShifts(): _log(f' {sh}')