Source code for ccpn.core.Collection

"""
Module Documentation here
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See https://ccpn.ac.uk/software/licensing/")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2022-02-10 22:59:54 +0000 (Thu, February 10, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: Ed Brooksbank $"
__date__ = "$Date: 2021-10-26 15:52:47 +0100 (Tue, October 26, 2021) $"
#=========================================================================================
# Start of code
#=========================================================================================

from typing import Optional, List, Tuple, Any, Sequence, Union
from collections import namedtuple
import fnmatch

from ccpn.core.Project import Project
from ccpn.core.lib.ContextManagers import undoBlock, ccpNmrV3CoreSetter
from ccpn.core._implementation.CollectionList import CO_COMMENT, CO_ISDELETED, \
    CO_ITEMS, CO_UNIQUEID, CO_NAME, _checkItems, _checkDuplicates
from ccpn.core._implementation.V3CoreObjectABC import V3CoreObjectABC
from ccpn.util.decorators import logCommand
from ccpn.util.OrderedSet import OrderedSet


CollectionParameters = namedtuple('CollectionParameters', f'{CO_UNIQUEID} {CO_ISDELETED} {CO_NAME} {CO_ITEMS} {CO_COMMENT} ')


class _searchCollections():
    """
    Iterator to recursively search collections for items of type specified in objectTypes

    depth defines the maximum number of nested collections that the search will through.
    Set depth=1, will return only the items in the specified collection.
    Set depth=0 for fully recursive, default=0

    Search is protected against infinite loops of collections

    objectTypes must be a tuple of types
    collection must be a list of core objects
    search must be None or regex string, * is automatically added at beginning and end, unless disabled
    caseSensitive must be True/False
    """
    _MAXITERATIONS = 100

    # iterator to search through nested collections and collate all items
    def __init__(self, collection, objectTypes=None,
                 search=None, useLongPid=None, caseSensitive=False, disableLeadingTrailingSearch=None,
                 depth=0):
        """Initialise the iterator
        """
        if not (isinstance(depth, int) and depth >= 0):
            raise ValueError('depth must be an int >= 0')

        self._items = OrderedSet(collection)
        self._objectTypes = objectTypes
        self._search = None
        self._caseSensitive = caseSensitive
        self._useLongPid = useLongPid
        self._disableLeadingTrailingSearch = disableLeadingTrailingSearch
        if search:
            # add extra wildcard searches to the leading/trailing edges
            if not search.endswith('*') and not disableLeadingTrailingSearch:
                search = search + '*'
            if not search.startswith('*') and not disableLeadingTrailingSearch:
                search = '*' + search
            self._search = search
        self._maxDepth = depth

    def __iter__(self):
        # initial iterator settings
        self._len = 0
        self._iteration = 0
        return self

    def __next__(self):
        """Generate the next ordered set of collection items
        """
        self._iteration += 1
        if self._iteration > self._MAXITERATIONS:
            # code check, just to make sure that a bug doesn't cause an infinite loop
            raise RuntimeError('Max search depth reached')

        if self._maxDepth and self._iteration >= self._maxDepth:
            # if reached specified/maximum depth then stop
            raise StopIteration

        # get the newly added, nested collections
        collections = list(filter(lambda obj: isinstance(obj, Collection),
                                  list(self._items)[self._len:]))
        if not collections:
            # if no more added to the orderedSet then reached the bottom
            raise StopIteration

        self._len = len(self._items)
        for coll in collections:
            # update the list of items
            self._items |= OrderedSet(coll.items)

        return self.items

    def __bool__(self):
        """Return True if there are items
        """
        return len(self._items) > 0

    __nonzero__ = __bool__

    def __len__(self):
        """Current number of filtered items in the iterator
        """
        return len(self.items)

    @property
    def items(self):
        """Return the filtered list of items
        """
        if self._objectTypes:
            return tuple(filter(lambda obj: isinstance(obj, self._objectTypes), self._items))
        elif self._search and self._useLongPid:
            return tuple(filter(lambda obj: fnmatch.fnmatch(obj.longPid, self._search) or
                                            (False if self._caseSensitive else
                                             fnmatch.fnmatch(obj.longPid.lower(), self._search.lower())), self._items))
        elif self._search:
            return tuple(filter(lambda obj: fnmatch.fnmatch(obj.pid, self._search) or
                                            (False if self._caseSensitive else
                                             fnmatch.fnmatch(obj.pid.lower(), self._search.lower())), self._items))
        else:
            return tuple(self._items)


class _searchCycles():
    """
    Iterator to recursively search collections for cycles

    depth defines the maximum number of nested collections that the search will through.
    Set depth=0 for fully recursive, default=0
    """
    _MAXITERATIONS = 100

    # iterator to search through nested collections and return any cycles
    def __init__(self, collection, depth=0):
        """Initialise the iterator
        """
        if not (isinstance(depth, int) and depth >= 0):
            raise ValueError('depth must be an int >= 0')

        self._items = [[(collection, [collection])]]
        self._maxDepth = depth

    def __iter__(self):
        # initial iterator settings
        self._len = 0
        self._iteration = 0
        self._cycles = ()
        return self

    def __next__(self):
        """Generate the next ordered set of collection items
        """
        self._iteration += 1
        if self._iteration > self._MAXITERATIONS:
            # code check, just to make sure that a bug doesn't cause an infinite loop
            raise RuntimeError('Max search depth reached')

        if self._maxDepth and self._iteration > self._maxDepth:
            # if reached specified/maximum depth then stop
            raise StopIteration
        if not self._items:
            raise StopIteration

        states = self._items.pop()
        for state, path in states:
            nextCollections = list(filter(lambda obj: isinstance(obj, Collection), state.items))

            _more = [(nextCol, path + [nextCol]) for nextCol in nextCollections if nextCol not in path]
            self._items.append(_more)
            _cycles = [tuple(path[path.index(nextCol):]) for nextCol in nextCollections if nextCol in path]
            self._cycles += tuple(_cycles)

        return self._cycles

    def __bool__(self):
        """Return True if there are cycles
        """
        return len(self._cycles) > 0

    __nonzero__ = __bool__

    def __len__(self):
        """Current number of cycles in the iterator
        """
        return len(self._cycles)


[docs]class Collection(V3CoreObjectABC): """Collection object, holding a list of core objects. """ #: Short class name, for PID. shortClassName = 'CO' # Attribute it necessary as subclasses must use superclass className className = 'Collection' _parentClass = Project #: Name of plural link to instances of class _pluralLinkName = 'collections' #: List of child classes. _childClasses = [] _isGuiClass = False # the attribute name used by current _currentAttributeName = 'collections' def __init__(self, project, collectionList, _uniqueId): """Create a new instance of v3 collection _unique Id links the collection to the dataFrame storage and MUST be specified before the collection can be used """ super().__init__(project, collectionList, _uniqueId) #========================================================================================= # CCPN Properties #========================================================================================= #========================================================================================= # Class Properties and methods #========================================================================================= @property def items(self) -> Tuple[Any, ...]: """List of items attached to the collection""" try: _getByPid = self._project.getByPid _itms = self._wrapperList._getAttribute(self._uniqueId, CO_ITEMS, list) # hiding the Nones handles the undo/redo behaviour for items return tuple(filter(None, map(_getByPid, _itms or ()))) except: return () @items.setter @logCommand(get='self', isProperty=True) @ccpNmrV3CoreSetter() def items(self, values): """Set the items in the collection""" values = _checkItems(self.project, values) if self in values: raise ValueError(f'Cannot add {self} to itself') _checkDuplicates(values) _oldItems = set(self.items) self._wrapperList._setAttribute(self._uniqueId, CO_ITEMS, [itm.pid for itm in values]) # notify the items have been added to/removed from collection _notify = set(values) ^ _oldItems - {self} self._finaliseChildren.extend((itm, 'change') for itm in _notify) def __len__(self): """Return the number of items in the collection """ return len(self.items) #========================================================================================= # Implementation functions #========================================================================================= def _rename(self, value): """Rename the collection """ # validate the name name = self._uniqueName(project=self.project, name=value) # rename functions from here oldName = self.name self._oldPid = self.pid self._wrapperList.renameCollection(self, name=name) return (oldName,) #========================================================================================= # Implementation functions - necessary as there is no abstractWrapper object #=========================================================================================
[docs] def delete(self): """Delete the collection """ self._wrapperList.deleteCollection(uniqueId=self._uniqueId)
# raise RuntimeError(f'{self.className}.delete: Please use CollectionList.deleteCollection()') # optional error-trap #========================================================================================= # CCPN functions #========================================================================================= def _checkNestedCollections(self, items: Sequence[Any]): """Check the list of items, as core objects Raise an error if there are any cycles CCPNInternal - to be used after _checkItem on flat list, not used yet but searchCycles available """ pass
[docs] @logCommand(get='self') def addItems(self, items: Sequence[Any]): """ Add an object or list of core objects to the collection. Action is ignored if the list is empty. Raise an error if there are any non-core objects :param items: single object or list of core objects, as objects or pid strings. """ items = _checkItems(self.project, items) if self in items: raise ValueError(f'Cannot add {self} to itself') _checkDuplicates(items) if items: _currentItms = self.items _exists = tuple(filter(lambda itm: itm in _currentItms, items)) if _exists: # check already exists if len(_exists) > 1: raise ValueError(f'items {_exists} already in collection {self}') else: raise ValueError(f'item {_exists[0]} already in collection {self}') with undoBlock(): for itm in items: _currentItms += (itm,) self.items = _currentItms
[docs] @logCommand(get='self') def removeItems(self, items: Sequence[Any]): """ Remove an object or list of core objects from the collection. The items must belong to the collection. Action is ignored if the list is empty. Raise an error if there are any non-core objects. :param items: single object or list of core objects, as objects or pid strings. """ items = _checkItems(self.project, items) if items: _currentItms = list(self.items) _exists = tuple(filter(lambda itm: itm not in _currentItms, items)) if _exists: if len(_exists) > 1: raise ValueError(f'items {_exists} not in collection {self}') else: raise ValueError(f'item {_exists[0]} not in collection {self}') with undoBlock(): for itm in items: _currentItms.remove(itm) self.items = _currentItms
[docs] @logCommand(get='self') def getByObjectType(self, objectTypes=None, search=None, useLongPid=None, caseSensitive=None, disableLeadingTrailingSearch=None, recursive=None, depth=None, ): """Return a list of items of types specified in objectTypes list. ObjectTypes is a list of core objects expressed as object classes or short/long classnames. If objectTypes is not specified then all objects will be returned. For example, class Note can be specified as Note, 'Note', 'NO', 'note' or 'no'. caseSensitive is False by default, if caseSensitive is True, objectTypes as class names must be specified exactly. ObjectTypes can be a single item or tuple/list, or None to return all items. Any Nones in lists will be ignored. search is a regex search string applied to the pids of objects in the collection. useLongPid can be specified with the search option to use the londPid descriptor of core objects as the search item. if caseSensitive is True, the exact pid or longPid is used for simple searches, although more detailed regex searches can be used to override this. Simple searches can use * as wildcard to represent 1 or more characters. Use ? to specify a single charaacter. Leading and trailing *'s are added by default. This can be disabled with disableLeadingTrailingSearch=True Set depth=0 or recursive=True to search through all nested collections, recursion=False or depth=1 will only search through the top collection, ignoring nested collections. Full recursion is the default setting. Please specify either recursive OR depth. Examples: :: collection.getByObjectTypes() collection.getByObjectTypes(objectTypes=Note) collection.getByObjectTypes(objectTypes='NO', recursive=False) collection.getByObjectTypes(objectTypes='Note', depth=2) collection.getByObjectTypes(objectTypes=['Note', Spectrum]) collection.getByObjectTypes(search='someNotes', useLongPid=True, caseSensiive=True) :param objectTypes: optional single item, or list of core objects as object class or classnames :param search: optional regex search string :param useLongPid: optional True/False, only use with search :param caseSensitive: optional True/False :param disableLeadingTrailingSearch: optional True/False :param recursive: optional True/False, only use with search :param depth: optional int >= 0 :return: tuple of core items. """ # check that the parameters are the correct, compatible types if (recursive is not None and depth is not None): raise ValueError('Please specify either recursive or depth') if (objectTypes is not None and search is not None): raise ValueError('Please specify either objectTypes or search') if depth is not None and not (isinstance(depth, int) and depth >= 0): raise ValueError('depth must be int >= 0; use 0 for full depth') if not isinstance(search, (str, type(None))): raise ValueError('search must be a regex string') for param, paramName in [(useLongPid, 'useLongPid'), (caseSensitive, 'caseSensitive'), (recursive, 'recursive'), (disableLeadingTrailingSearch, 'disableLeadingTrailingSearch')]: if param not in [None, True, False]: raise ValueError(f'{paramName} must be True/False') if useLongPid is not None and not search: raise ValueError('useLongPid only valid when search is specified') if disableLeadingTrailingSearch is not None and not search: raise ValueError('disableLeadingTrailingSearch only valid when search is specified') if isinstance(objectTypes, str) or (objectTypes is not None and self.project.isCoreClass(objectTypes)): # change a single item to a list, if is a str or a core object (object with .pid), strings are checked later objectTypes = [objectTypes, ] if not isinstance(objectTypes, (list, tuple, type(None))): raise ValueError('objectTypes must be list/tuple of core objects or classnames, or single core object or None') # remove any Nones from the list if objectTypes is not None: objectTypes = list(filter(lambda itm: itm is not None, objectTypes)) _objectTypes = None if objectTypes: # check the list of object types against the project classNames if caseSensitive: _allObjectTypes = self.project._className2Class _allList = self.project._className2ClassList # case sensitive search - check against all defined core objects in project _objectTypes = list(filter(lambda itm: (itm in _allList), objectTypes)) if len(_objectTypes) != len(objectTypes): _badObjectTypes = list(filter(lambda itm: itm not in _allList, objectTypes)) raise ValueError(f'objectTypes contains bad items: {_badObjectTypes}') # change all valid strings to core object types _objectTypes = tuple(_allObjectTypes[itm] if isinstance(itm, str) else itm for itm in _objectTypes) else: _allObjectTypes = self.project._classNameLower2Class _allList = self.project._classNameLower2ClassList # case insensitive search - change all to lowercase _objectTypes = list(filter(lambda itm: (itm.lower() if isinstance(itm, str) else itm) in _allList, objectTypes)) if len(_objectTypes) != len(objectTypes): _badObjectTypes = list(filter(lambda itm: (itm.lower() if isinstance(itm, str) else itm) not in _allList, objectTypes)) raise ValueError(f'objectTypes contains bad items: {_badObjectTypes}') # change all valid strings to core object types _objectTypes = tuple(_allObjectTypes[itm.lower()] if isinstance(itm, str) else itm for itm in _objectTypes) # create a search iterator recurse = _searchCollections(self.items, objectTypes=_objectTypes, search=search, useLongPid=useLongPid, caseSensitive=caseSensitive, disableLeadingTrailingSearch=disableLeadingTrailingSearch, depth=depth or (1 if recursive == False else 0)) for _ in recurse: pass # return the filtered list return recurse.items if recurse else ()
[docs] def searchCycles(self, depth=0): """Check whether there are any cycles in the collections """ # create a search iterator recurse = _searchCycles(self, depth=depth) for _ in recurse: pass # return the list of cycles return recurse._cycles if recurse else ()
#=========================================================================================== # new<Object> and other methods # Call appropriate routines in their respective locations #=========================================================================================== #========================================================================================= # Connections to parents: #========================================================================================= def _newCollection(project: Project, collectionList, _uniqueId: Optional[int] = None): """Create a new collection attached to the collectionList. :param project: core project :param collectionList: parent collectionList :param _uniqueId: _unique int identifier :return: a new Collection instance. """ result = Collection(project, collectionList, _uniqueId=_uniqueId) if result is None: raise RuntimeError(f'{collectionList.__class__.__name__}._newCollection: unable to generate new Collection item') return result def _getByTuple(collectionList, name: str, items: Union[List[Union[Any, str, None]], Tuple[Union[Any, str, None]]] = None, comment: str = None): """Create a new tuple object from the supplied parameters Check whether a valid tuple can be created, otherwise raise the appropriate errors CCPN Internal """ if items: items = _checkItems(collectionList._project, items) _checkDuplicates(items) newRow = (None, None, str(name), str([itm.pid for itm in items]) if items else None, comment,) newRow = CollectionParameters(*newRow) return newRow def _getCollection(project: Project, name: str) -> Optional[Collection]: """Return the collection from the supplied name """ if not (name and isinstance(name, str)): raise ValueError('name must be of type str') dd = project._pid2Obj.get(Collection.className) if dd: key = '{}'.format(name) return dd.get(key) else: return None def _fetchCollection(project: Project, name: Optional[str] = None) -> Collection: """Get or create a new Collection instance Supplying None for the name will return a new Collection with the next available name. :param project: current project :param name: str :return: new or existing Collection instance. """ if not isinstance(name, (str, type(None))): raise ValueError('name must be of type str or None') if name and (result := _getCollection(project, name)): return result else: with undoBlock(): result = project.newCollection(name=name) return result