Source code for ccpn.core.lib.PeakClustering

"""
This module contains  clustering routines.
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
               "Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See https://ccpn.ac.uk/software/licensing/")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
                 "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
                 "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Luca Mureddu $"
__dateModified__ = "$dateModified: 2022-03-04 18:50:46 +0000 (Fri, March 04, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: Luca Mureddu $"
__date__ = "$Date: 2022-02-02 14:08:56 +0000 (Wed, February 02, 2022) $"
#=========================================================================================
# Start of code
#=========================================================================================


import abc
import numpy as np
import itertools
from typing import Tuple
from ccpn.core.lib.ContextManagers import undoBlockWithoutSideBar
from ccpn.util.Logging import getLogger

DFS_PeakClusterer = 'DFS_PeakClusterer'

[docs]class PeakClustererABC(object): """ Base class for peak clustering """ name = 'PeakClustererABC' info = 'The algorithm textual information' def __init__(self, peaks, tolerances, **kwargs): self.peaks = peaks self.tolerances = tolerances # list of tolerances (in points) in the same length as the spectrum.dimensionCount self.kwargs = kwargs self._checkInititialConditions()
[docs] @abc.abstractmethod def findClusters(self, *args, **kwargs) -> Tuple[Tuple['Peak', ...], ...]: """ The main method to find clusters given a list of peaks. Implement method in subclass. :param args: any :param kwargs: any :return: A tuple of tuples of peaks. Each tuple represents a cluster. """ pass
[docs] def setClusterIdToPeaks(self, clusters): """ :param clusters: A tuple of tuples of peaks. Each tuple is a cluster. :return: None. Adds the enumeration as cluster Id to the peaks """ with undoBlockWithoutSideBar(): for num, cluster in enumerate(clusters): for peak in cluster: peak.clusterId = num + 1
def _checkInititialConditions(self, errorPolicy='warning'): """ Check if the Clustering can initiate. :param errorPolicy: One of warning, raise, critical :return: None """ isOkToProceed = all([pk.spectrum.dimensionCount == len(self.tolerances) for pk in self.peaks]) if not isOkToProceed: doLog = getattr(getLogger(), errorPolicy, getLogger().warning) doLog('Searching tolerances not set correctly. Ensure all peaks are of same dimensionality ' 'and the tolerances list is the same length as the spectrum.dimensionCount') def _getOverlapPairsByPositions(self): """ Consider two peaks adjacent if the PointPositions are within the tolerances. :param peaks: :param tolerances: list of tolerances (in points). Same length of spectrum.dimensionCount :return: A list of adjacent pairs of peaks """ overlaps = [] lims = np.array(self.tolerances) for pair in itertools.combinations(self.peaks, 2): dd = np.abs(np.array(pair[0].pointPositions) - np.array(pair[1].pointPositions)) if all(dd < lims): overlaps.append(pair) return overlaps
[docs]class DFSPeakClusterer(PeakClustererABC): name = DFS_PeakClusterer info = 'Depth-first search (DFS) algorithm for peak clustering by pointPositions'
[docs] def findClusters(self, *args, **kwargs): """ Find clusters using a cluster-graph approach. - Ref 1) https://en.wikipedia.org/wiki/Graph_theory - Ref 2) https://en.wikipedia.org/wiki/Cluster_graph """ overlappedPeaks = self._getOverlapPairsByPositions() positions = [pk.pointPositions for pk in self.peaks] overlappedPositions = [(pair[0].pointPositions, pair[1].pointPositions) for pair in overlappedPeaks] result = DFSPeakClusterer._getClusterOverlaps(positions, overlappedPositions) clusteredPeaks = [] for i, group in enumerate(result): peakCluster = [] for peak in self.peaks: for j in group: if j == peak.pointPositions: peakCluster.append(peak) clusteredPeaks.append(tuple(peakCluster)) return tuple(clusteredPeaks)
@staticmethod def _getClusterOverlaps(nodes, adjacents): """ A cluster graph algorithm. Code seen in: https://stackoverflow.com/questions/14607317/ """ clusters = [] nodes = list(nodes) while len(nodes): node = nodes[0] path = DFSPeakClusterer._getPathByDFS(node, adjacents, nodes) clusters.append(path) for pt in path: nodes.remove(pt) return clusters @staticmethod def _getPathByDFS(start, adjacents, nodes): """ A cluster graph algorithm. Code seen in: https://stackoverflow.com/questions/14607317/ """ path = [] q = [start] while q: node = q.pop(0) if path.count(node) >= nodes.count(node): continue path = path + [node] nextNodes = [p2 for p1,p2 in adjacents if p1 == node] q = nextNodes + q return path
PeakClusterers = { DFSPeakClusterer.name: DFSPeakClusterer }