Source code for ccpn.ui.gui.modules.RestraintAnalysisTable
"""
Module Documentation here
"""
#=========================================================================================
# Licence, Reference and Credits
#=========================================================================================
__copyright__ = "Copyright (C) CCPN project (https://www.ccpn.ac.uk) 2014 - 2022"
__credits__ = ("Ed Brooksbank, Joanna Fox, Victoria A Higman, Luca Mureddu, Eliza Płoskoń",
"Timothy J Ragan, Brian O Smith, Gary S Thompson & Geerten W Vuister")
__licence__ = ("CCPN licence. See https://ccpn.ac.uk/software/licensing/")
__reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
"CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
"J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
#=========================================================================================
# Last code modification
#=========================================================================================
__modifiedBy__ = "$modifiedBy: Ed Brooksbank $"
__dateModified__ = "$dateModified: 2022-03-08 18:22:25 +0000 (Tue, March 08, 2022) $"
__version__ = "$Revision: 3.1.0 $"
#=========================================================================================
# Created
#=========================================================================================
__author__ = "$Author: Ed Brooksbank $"
__date__ = "$Date: 2021-04-26 11:53:10 +0100 (Mon, April 26, 2021) $"
#=========================================================================================
# Start of code
#=========================================================================================
import numpy as np
import pandas as pd
import random
from functools import partial
from PyQt5 import QtWidgets
from collections import OrderedDict
from itertools import zip_longest
from ccpn.core.lib.CcpnSorting import universalSortKey
from ccpn.core.Peak import Peak
from ccpn.core.PeakList import PeakList
from ccpn.core.Restraint import Restraint
from ccpn.core.RestraintTable import RestraintTable
from ccpn.core.ViolationTable import ViolationTable
from ccpn.core.lib.CallBack import CallBack
from ccpn.core.lib.DataFrameObject import DataFrameObject
from ccpn.ui.gui.modules.CcpnModule import CcpnModule
from ccpn.ui.gui.widgets.PulldownListsForObjects import PeakListPulldown
from ccpn.ui.gui.widgets.GuiTable import GuiTable, _getValueByHeader
from ccpn.ui.gui.widgets.Column import ColumnClass
from ccpn.ui.gui.widgets.ButtonList import ButtonList
from ccpn.ui.gui.widgets.CompoundWidgets import DoubleSpinBoxCompoundWidget
from ccpn.ui.gui.widgets.Button import Button
from ccpn.ui.gui.widgets.Spacer import Spacer
from ccpn.ui.gui.widgets.Icon import Icon
from ccpn.ui.gui.widgets.TableSorting import MultiColumnTableWidgetItem
from ccpn.ui.gui.widgets.SettingsWidgets import ModuleSettingsWidget, \
RestraintTableSelectionWidget, SpectrumDisplaySelectionWidget, ViolationTableSelectionWidget
from ccpn.util.Logging import getLogger
from ccpn.util.Common import makeIterableList
import ccpn.ui.gui.modules.PyMolUtil as pyMolUtil
from ccpn.ui.gui.widgets import MessageDialog
from ccpn.util.Common import flattenLists
from ccpn.util.Path import Path, aPath, fetchDir, joinPath
logger = getLogger()
UNITS = ['ppm', 'Hz', 'point']
LINKTOPULLDOWNCLASS = 'linkToPulldownClass'
HeaderIndex = '#'
HeaderPeak = 'Peak Serial'
HeaderObject = '_object'
HeaderExpand = 'Expand'
HeaderRestraint = 'Restraint Pid'
HeaderAtoms = 'Atoms'
HeaderTarget = 'Target Value'
HeaderLowerLimit = 'Lower Limit'
HeaderUpperLimit = 'Upper Limit'
HeaderMin = 'Min'
HeaderMax = 'Max'
HeaderMean = 'Mean'
HeaderStd = 'STD'
HeaderCount1 = 'Count > 0.3'
HeaderCount2 = 'Count > 0.5'
nefHeaders = ['restraintpid', 'atoms',
'target_value', 'lower_limit', 'upper_limit',
'min', 'max', 'mean', 'std',
'count_0_3', 'count_0_5']
Headers = [HeaderRestraint,
HeaderAtoms,
HeaderTarget,
HeaderLowerLimit,
HeaderUpperLimit,
HeaderMin,
HeaderMax,
HeaderMean,
HeaderStd,
HeaderCount1,
HeaderCount2]
_OLDHEADERS = {'RestraintPid': HeaderRestraint,
'Count>0.3' : HeaderCount1,
'Count>0.5' : HeaderCount2}
ALL = '<Use all>'
PymolScriptName = 'Restraint_Pymol_Template.py'
_SPECTRUMDISPLAYS = 'SpectrumDisplays'
_RESTRAINTTABLES = 'RestraintTables'
_VIOLATIONTABLES = 'ViolationTables'
_RESTRAINTTABLE = 'restraintTable'
_VIOLATIONRESULT = 'violationResult'
[docs]class RestraintAnalysisTableModule(CcpnModule):
"""
This class implements the module by wrapping a RestraintAnalysisTable instance
"""
includeSettingsWidget = True
maxSettingsState = 2
settingsPosition = 'left'
settingsMinimumSizes = (500, 200)
includeDisplaySettings = True
includePeakLists = False
includeNmrChains = False
includeSpectrumTable = False
className = 'RestraintAnalysisTableModule'
_allowRename = True
activePulldownClass = None # e.g., can make the table respond to current peakList
def __init__(self, mainWindow=None, name='Restraint Analysis Table',
peakList=None, selectFirstItem=False):
super().__init__(mainWindow=mainWindow, name=name)
# Derive application, project, and current from mainWindow
self.mainWindow = mainWindow
if mainWindow:
self.application = mainWindow.application
self.project = mainWindow.application.project
self.current = mainWindow.application.current
self.scriptsPath = self.application.scriptsPath
self.pymolScriptsPath = fetchDir(self.scriptsPath, 'pymol')
else:
self.application = None
self.project = None
self.current = None
# add the settings widgets defined from the following orderedDict - test for refactored
settingsDict = OrderedDict(((_SPECTRUMDISPLAYS, {'label' : '',
'tipText' : '',
'callBack': None, #self.restraintTablePulldown,
'enabled' : True,
'_init' : None,
'type' : SpectrumDisplaySelectionWidget,
'kwds' : {'texts' : [],
'displayText' : [],
'defaults' : [],
'objectName' : 'SpectrumDisplaysSelection',
'minimumWidths': (180, 100, 100)},
}),
(_RESTRAINTTABLES, {'label' : '',
'tipText' : '',
'callBack': None, #self.restraintTablePulldown,
'enabled' : True,
'_init' : None,
'type' : RestraintTableSelectionWidget,
'kwds' : {'texts' : [],
'displayText' : [],
'defaults' : [],
'objectName' : 'RestraintTablesSelection',
'minimumWidths': (180, 100, 100)},
}),
(_VIOLATIONTABLES, {'label' : '',
'tipText' : '',
'callBack': None,
'enabled' : True,
'_init' : None,
'type' : ViolationTableSelectionWidget,
'kwds' : {'texts' : [],
'displayText' : [],
'defaults' : [],
'objectName' : 'RestraintTablesSelection',
'minimumWidths': (180, 100, 100)},
}),
# ('autoExpand', {'label' : '',
# 'tipText' : '',
# 'callBack': self._updateAutoExpand,
# 'enabled' : True,
# 'checked' : False,
# '_init' : None,
# 'type' : RadioButtonsCompoundWidget,
# 'kwds' : {'labelText' : 'Auto-Expand Groups',
# 'compoundKwds': {'direction': 'h',
# 'hAlign' : 'l',
# 'texts' : ['Collapse', 'Expand', 'Ignore'], }
# }
# }),
('meanLowerLimit', {'label' : 'Mean Value Lower Limit',
'callBack': self._updateMeanLowerLimit,
'enabled' : True,
'_init' : None,
'type' : DoubleSpinBoxCompoundWidget,
'kwds' : {'labelText' : 'Mean Value Lower Limit',
'tipText' : 'Lower threshold for mean value of restraints',
'range' : (0.0, 1.0),
'decimals' : 2,
'step' : 0.05,
'value' : 0.3,
'minimumWidths': (180, 100, 100)},
}),
('autoExpand', {'label' : 'Auto-expand Groups',
'tipText' : 'Automatically expand/collapse groups on\nadding new restraintTable, or sorting.',
'callBack': self._updateAutoExpand,
'enabled' : True,
'checked' : False,
'_init' : None,
}),
('sequentialStrips', {'label' : 'Show sequential strips',
'tipText' : 'Show nmrResidue in all strips.',
'callBack': None, #self.showNmrChainFromPulldown,
'enabled' : True,
'checked' : False,
'_init' : None,
}),
('markPositions', {'label' : 'Mark positions',
'tipText' : 'Mark positions in all strips.',
'callBack': None, #self.showNmrChainFromPulldown,
'enabled' : True,
'checked' : True,
'_init' : None,
}),
('autoClearMarks', {'label' : 'Auto clear marks',
'tipText' : 'Auto clear all previous marks',
'callBack': None,
'enabled' : True,
'checked' : True,
'_init' : None,
}),
))
if self.activePulldownClass:
settingsDict.update(OrderedDict(((LINKTOPULLDOWNCLASS, {'label' : 'Link to current %s:' % self.activePulldownClass.className,
'tipText' : 'Set/update current %s when selecting from pulldown' % self.activePulldownClass.className,
'callBack': None,
'enabled' : True,
'checked' : True,
'_init' : None,
}),
)))
self._RATwidget = ModuleSettingsWidget(parent=self.settingsWidget, mainWindow=self.mainWindow,
settingsDict=settingsDict,
grid=(0, 0))
self._displayListWidget = self._RATwidget.getWidget(_SPECTRUMDISPLAYS)
self._restraintTable = self._RATwidget.getWidget(_RESTRAINTTABLES)
self._restraintTable.listWidget.changed.connect(self._updateRestraintTables)
self._outputTable = self._RATwidget.getWidget(_VIOLATIONTABLES)
self._outputTable.listWidget.changed.connect(self._updateOutputTables)
self._meanLowerLimitSpinBox = self._RATwidget.checkBoxes['meanLowerLimit']['widget']
self._autoExpandCheckBox = self._RATwidget.checkBoxes['autoExpand']['widget']
# mainWidget
self.restraintAnalysisTable = RestraintAnalysisTableWidget(parent=self.mainWidget,
mainWindow=self.mainWindow,
moduleParent=self,
setLayout=True,
grid=(0, 0))
if peakList is not None:
self.selectPeakList(peakList)
elif selectFirstItem:
self.restraintAnalysisTable.pLwidget.selectFirstItem()
self.installMaximiseEventHandler(self._maximise, self._closeModule)
@property
def _dataFrame(self):
if self.restraintAnalysisTable._dataFrameObject:
return self.restraintAnalysisTable._dataFrameObject.dataFrame
def _maximise(self):
"""
Maximise the attached table
"""
self.restraintAnalysisTable._maximise()
[docs] def selectPeakList(self, peakList=None):
"""
Manually select a peakList from the pullDown
"""
self.restraintAnalysisTable._selectPeakList(peakList)
def _closeModule(self):
"""Re-implementation of closeModule function from CcpnModule to unregister notification """
self.restraintAnalysisTable._close()
super()._closeModule()
[docs] def restoreWidgetsState(self, **widgetsState):
super().restoreWidgetsState(**widgetsState)
getLogger().debug(f'RestraintTableModule {self} - restoreWidgetsState')
# need to set the values from the restored state
self.restraintAnalysisTable._updateSettings(self._meanLowerLimitSpinBox.getValue(),
self._autoExpandCheckBox.get())
@property
def dataFrame(self):
return self.restraintAnalysisTable.dataFrame
@dataFrame.setter
def dataFrame(self, value):
self.restraintAnalysisTable.dataFrame = value
def _updateRestraintTables(self, *args):
"""Update the selected restraintTables
"""
restraintTables = self._restraintTable.getTexts()
if ALL in restraintTables:
restraintTables = self.project.restraintTables
else:
restraintTables = [self.project.getByPid(rList) for rList in restraintTables]
restraintTables = [rList for rList in restraintTables if rList is not None and isinstance(rList, RestraintTable)]
self.restraintAnalysisTable.updateRestraintTables(restraintTables)
def _updateOutputTables(self, *args):
"""Update the selected outputTables
"""
outputTables = self._outputTable.getTexts()
if ALL in outputTables:
outputTables = self.project.violationTables
else:
outputTables = [self.project.getByPid(rList) for rList in outputTables]
outputTables = list(filter(None, outputTables))
self.restraintAnalysisTable.updateOutputTables(outputTables)
def _updateAutoExpand(self, expand):
# index = self._expandSelector.getIndex()
self.restraintAnalysisTable.updateAutoExpand(expand)
def _updateMeanLowerLimit(self, value):
self.restraintAnalysisTable.updateMeanLowerLimit(value)
[docs]class RestraintAnalysisTableWidget(GuiTable):
"""
Class to present a Restraint Analysis Table
"""
className = 'RestraintAnalysisTable'
attributeName = 'peakLists'
positionsUnit = UNITS[0] #default
# define _columns for multi-column sorting
ITEMKLASS = MultiColumnTableWidgetItem
MERGECOLUMN = 1
PIDCOLUMN = 1
EXPANDERCOLUMN = 3
SPANCOLUMNS = (0, 1, 2, 3)
MINSORTCOLUMN = 3
enableMultiColumnSort = True
# groups are always max->min
applySortToGroups = False
PRIMARYCOLUMN = '_object' # column holding active objects (pids for this table)
# not the cleanest way for the minute
defaultHidden = ['Min_1', 'Min_2', 'Min_3', 'Min_4', 'Min_5',
'Min_6', 'Min_7', 'Min_8', 'Min_9',
'Max_1', 'Max_2', 'Max_3', 'Max_4', 'Max_5',
'Max_6', 'Max_7', 'Max_8', 'Max_9',
'Mean_1', 'Mean_2', 'Mean_3', 'Mean_4', 'Mean_5',
'Mean_6', 'Mean_7', 'Mean_8', 'Mean_9',
'STD_1', 'STD_2', 'STD_3', 'STD_4', 'STD_5',
'STD_6', 'STD_7', 'STD_8', 'STD_9',
]
def __init__(self, parent=None, mainWindow=None, moduleParent=None, peakList=None, multiSelect=True,
actionCallback=None, selectionCallback=None, hiddenColumns=None, **kwds):
"""
Initialise the table
"""
# Derive application, project, and current from mainWindow
self.mainWindow = mainWindow
if mainWindow:
self.application = mainWindow.application
self.project = mainWindow.application.project
self.current = mainWindow.application.current
else:
self.application = None
self.project = None
self.current = None
RestraintAnalysisTableWidget.project = self.project
self.settingWidgets = None
self._selectedPeakList = None
kwds['setLayout'] = True # Assure we have a layout with the widget
self._restraintTables = []
self._outputTables = []
self._autoExpand = False
self._meanLowerLimit = 0.0
# Initialise the scroll widget and common settings
self._initTableCommonWidgets(parent, **kwds)
row = 0
self.spacer = Spacer(self._widget, 5, 5,
QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed,
grid=(row, 0), gridSpan=(1, 1))
row += 1
gridHPos = 0
self.pLwidget = PeakListPulldown(parent=self._widget,
mainWindow=self.mainWindow,
grid=(row, gridHPos), gridSpan=(1, 1),
showSelectName=True,
minimumWidths=(0, 100),
sizeAdjustPolicy=QtWidgets.QComboBox.AdjustToContents,
callback=self._pulldownPLcallback,
)
gridHPos += 1
self.expandButtons = ButtonList(parent=self._widget, texts=[' Expand all', ' Collapse all'], grid=(row, gridHPos),
callbacks=[partial(self._expandAll, True), partial(self._expandAll, False), ])
gridHPos += 1
self.showOnViewerButton = Button(self._widget, tipText='Show on Molecular Viewer',
icon=Icon('icons/showStructure'),
callback=self._showOnMolecularViewer,
grid=(row, gridHPos), hAlign='l')
row += 1
self.spacer = Spacer(self._widget, 5, 5,
QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Fixed,
grid=(row, gridHPos + 1), gridSpan=(1, 1))
self._widget.getLayout().setColumnStretch(gridHPos + 1, 2)
self._hiddenColumns = hiddenColumns or self.defaultHidden
self.dataFrameObject = None
selectionCallback = self._selectionCallback if selectionCallback is None else selectionCallback
actionCallback = self._actionCallback if actionCallback is None else actionCallback
super().__init__(parent=parent,
mainWindow=self.mainWindow,
dataFrameObject=None,
setLayout=True,
autoResize=True, multiSelect=multiSelect,
actionCallback=actionCallback,
selectionCallback=selectionCallback,
grid=(3, 0), gridSpan=(1, 6))
self.moduleParent = moduleParent
# populate the table if there are peakLists in the project
if peakList is not None:
self._selectPeakList(peakList)
self.setTableNotifiers(tableClass=PeakList,
rowClass=Peak,
# cellClassNames=(Restraint, 'restraints'),
# tableName='peakList', rowName='peak',
# changeFunc=self._updateTable,
className=self.attributeName,
# updateFunc=self._updateTable,
tableSelection='_selectedPeakList',
pullDownWidget=self.pLwidget,
callBackClass=Peak,
selectCurrentCallBack=self._selectOnTableCurrentPeaksNotifierCallback,
moduleParent=moduleParent)
# Initialise the notifier for processing dropped items
self._postInitTableCommonWidgets()
# update method for ccpn sorting with groups
# TableWidgetItem.__lt__ = __ltForTableWidgetItem__
self.horizontalHeader().sectionClicked.connect(self.onSectionClicked)
self.horizontalHeader().setMinimumSectionSize(32)
self._downIcon = Icon('icons/caret-grey-down')
self._rightIcon = Icon('icons/caret-grey-right')
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Updates
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _maximise(self):
"""Refresh the table on a maximise event
"""
self._updateTable()
[docs] def updateRestraintTables(self, restraintTables):
"""Update the selected restraint lists from the parent module
"""
self._restraintTables = restraintTables
self._updateTable()
[docs] def updateOutputTables(self, outputTables):
"""Update the selected data lists from the parent module
"""
self._outputTables = outputTables
self._updateTable()
def _updateTable(self, useSelectedPeakList=True, peaks=None, peakList=None):
"""Display the restraints on the table for the selected PeakList.
Obviously, If the restraint has not been previously deleted and flagged isDeleted
"""
self._selectedPeakList = self.project.getByPid(self.pLwidget.getText())
self._groups = None
self.hide()
if useSelectedPeakList:
if self._selectedPeakList:
self.populateTable(rowObjects=self._selectedPeakList.peaks,
# columnDefs=self._columns,
selectedObjects=self.current.peaks)
else:
self.clear()
else:
if peaks:
if peakList:
self.populateTable(rowObjects=peaks,
# columnDefs=self._columns,
selectedObjects=self.current.peaks)
else:
self.clear()
self.updateTableExpanders()
self.show()
def _selectPeakList(self, peakList=None):
"""Manually select a PeakList from the pullDown
"""
if peakList is None:
self.pLwidget.selectFirstItem()
else:
if not isinstance(peakList, PeakList):
logger.warning('select: Object is not of type PeakList')
raise TypeError('select: Object is not of type PeakList')
else:
for widgetObj in self.pLwidget.textList:
if peakList.pid == widgetObj:
self._selectedPeakList = peakList
self.pLwidget.select(self._selectedPeakList.pid)
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Widgets callbacks
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _getPullDownSelection(self):
return self.pLwidget.getText()
def _actionCallback(self, data, *args):
"""If current strip contains the double clicked peak will navigateToPositionInStrip
"""
from ccpn.ui.gui.lib.StripLib import navigateToPositionInStrip, _getCurrentZoomRatio
# multiselection table will return a list of objects
objs = data[CallBack.OBJECT]
if not objs:
return
if isinstance(objs, (tuple, list)):
peak = objs[0]
else:
peak = objs
if self.current.strip is not None:
validPeakListViews = [pp.peakList for pp in self.current.strip.peakListViews if isinstance(pp.peakList, PeakList)]
if peak and peak.peakList in validPeakListViews:
widths = None
if peak.peakList.spectrum.dimensionCount <= 2:
widths = _getCurrentZoomRatio(self.current.strip.viewRange())
navigateToPositionInStrip(strip=self.current.strip,
positions=peak.position,
axisCodes=peak.axisCodes,
widths=widths)
else:
logger.warning('Impossible to navigate to peak position. Set a current strip first')
def _selectionCallback(self, data, *args):
"""
set as current the selected peaks on the table
"""
peaks = data[CallBack.OBJECT]
if peaks is None:
self.current.clearPeaks()
self.current.clearRestraints()
else:
self.current.peaks = peaks
self.current.restraints = list(set(res for pk in peaks
for res in pk.restraints if res and res.restraintTable in self._restraintTables))
def _pulldownPLcallback(self, data):
self._updateTable()
def _processDroppedItems(self, data):
"""CallBack for Drop events
"""
return
def _expandAll(self, expand):
"""Expand/collapse all groups
"""
self.updateTableExpanders(expand)
[docs] def updateAutoExpand(self, expand):
"""Set the auto-expand/collapsed state for adding new restraintTables, or sorting table
"""
self._autoExpand = expand
[docs] def updateMeanLowerLimit(self, value):
"""Set the lower limit for visible restraints
"""
self._meanLowerLimit = value
self._updateTable()
def _updateSettings(self, meanLowerLimit, expand):
self._meanLowerLimit = meanLowerLimit
self._autoExpand = expand
def _selectOnTableCurrentPeaksNotifierCallback(self, data):
"""
Callback from a notifier to highlight the peaks on the peak table
:param data:
"""
currentPeaks = data['value']
self._selectOnTableCurrentPeaks(currentPeaks)
def _selectOnTableCurrentPeaks(self, currentPeaks):
"""
Highlight the list of peaks on the table
:param currentPeaks:
"""
self.highlightObjects(currentPeaks)
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Subclass GuiTable
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def populateTable(self, rowObjects=None, columnDefs=None,
selectedObjects=None):
"""Populate the table with a set of objects to highlight, or keep current selection highlighted
with the first item visible.
Use selectedObjects = [] to clear the selected items
:param rowObjects: list of objects to set each row
"""
self.project.blankNotification()
# if nothing passed in then keep the current highlighted objects
objs = selectedObjects if selectedObjects is not None else self.getSelectedObjects()
try:
_dataFrameObject = self.getDataFrameFromExpandedList(table=self,
buildList=rowObjects,
expandColumn='Restraint')
# populate from the Pandas dataFrame inside the dataFrameObject
self.setTableFromDataFrameObject(dataFrameObject=_dataFrameObject, columnDefs=self._columns)
except Exception as es:
getLogger().warning('Error populating table', str(es))
raise es
finally:
self._highLightObjs(objs)
self.project.unblankNotification()
[docs] def setTableFromDataFrameObject(self, dataFrameObject, columnDefs=None):
"""Populate the table from a Pandas dataFrame
"""
with self._tableBlockSignals('setTableFromDataFrameObject'):
# get the currently selected objects
objs = self.getSelectedObjects()
self._dataFrameObject = dataFrameObject
with self._guiTableUpdate(dataFrameObject):
if not dataFrameObject.dataFrame.empty:
self.setData(dataFrameObject.dataFrame.values)
self._updateGroups(dataFrameObject.dataFrame)
else:
# set a dummy row of the correct length
self.setData([list(range(len(dataFrameObject.headings)))])
self._groups = None
if columnDefs:
for col, colFormat in enumerate(columnDefs.formats):
if colFormat is not None:
self.setFormat(colFormat, column=col)
# highlight them back again
self._highLightObjs(objs)
def _highLightObjs(self, selection, scrollToSelection=True):
# skip if the table is empty
if not self._dataFrameObject:
return
with self._tableBlockSignals('_highLightObjs'):
selectionModel = self.selectionModel()
model = self.model()
itm = self.currentItem()
selectionModel.clearSelection()
if selection:
if len(selection) > 0:
if isinstance(selection[0], pd.Series):
# not sure how to handle this
return
uniqObjs = set(selection)
_peakObjects = tuple(_getValueByHeader(row, 3) for row in self._dataFrameObject.objects)
rows = [self._dataFrameObject.find(self, str(obj.pid), column='_object', multiRow=True) for obj in uniqObjs]
# if obj in _peakObjects and obj.peakList == self._selectedPeakList]
rows = [row for row in set(makeIterableList(rows)) if row is not None]
if rows:
rows.sort(key=lambda c: int(c))
# remember the current cell so that cursor work correctly
if itm and itm.row() in rows:
self.setCurrentItem(itm)
_row = itm.row()
else:
_row = rows[0]
rowIndex = model.index(_row, 0)
self.setCurrentIndex(rowIndex)
for row in rows:
if row != _row:
rowIndex = model.index(row, 0)
selectionModel.select(rowIndex, selectionModel.Select | selectionModel.Rows)
if scrollToSelection and not self._scrollOverride:
self.scrollToSelectedIndex()
[docs] def getDataFrameFromExpandedList(self, table=None,
buildList=None,
colDefs=None,
expandColumn=None):
"""
Return a Pandas dataFrame from an internal list of objects
The columns are based on the 'func' functions in the columnDefinitions
:param buildList:
:param colDefs:
:return pandas dataFrameObject:
"""
allItems = []
# building...
# _buildColumns = [(HeaderIndex, lambda pk, rt: pk.serial),
# (HeaderPeak, lambda pk, rt: pk.pid),
# (HeaderObject, lambda pk, rt: (pk, rt)),
# ]
_restraintColumns = [(HeaderRestraint, lambda rt: ''),
(HeaderAtoms, lambda rt: ''),
(HeaderTarget, lambda rt: 0.0),
(HeaderLowerLimit, lambda rt: 0.0),
(HeaderUpperLimit, lambda rt: 0.0),
(HeaderMin, lambda rt: 0.0),
(HeaderMax, lambda rt: 0.0),
(HeaderMean, lambda rt: 0.0),
(HeaderStd, lambda rt: 0.0),
(HeaderCount1, lambda rt: 0.0),
(HeaderCount2, lambda rt: 0.0),
]
# define self._columns here
# create the column objects
_cols = [
(HeaderIndex, lambda row: _getValueByHeader(row, HeaderIndex), 'TipTex1', None, None),
(HeaderPeak, lambda row: _getValueByHeader(row, HeaderPeak), 'TipTex2', None, None),
(HeaderObject, lambda row: _getValueByHeader(row, HeaderObject), 'TipTex3', None, None),
]
if len(self._restraintTables) > 0:
# _buildColumns.append((HeaderExpand, lambda pk, rt: self._downIcon))
_cols.append((HeaderExpand, lambda row: None, 'TipTex4', None, None))
# for col in range(len(self._restraintTables)):
# for _colID in (HeaderRestraint, HeaderAtoms, HeaderMin, HeaderMax, HeaderMean, HeaderStd, HeaderCount1, HeaderCount2):
# _cols.append((f'{_colID}_{col + 1}', lambda row: _getValueByHeader(row, f'{_colID}_{col + 1}'), f'{_colID}_Tip{col + 1}', None, None))
#
# # _cols.append((f'Restraint{col + 1}', lambda row: _getValueByHeader(row, f'Restraint{col + 1}'), f'RTipTex{col + 1}', None, None))
# # _cols.append((f'Atoms{col + 1}', lambda row: _getValueByHeader(row, f'Atoms{col + 1}'), f'ATipTex{col + 1}', None, None))
# # _cols.append((f'Violation{col + 1}', lambda row: _getValueByHeader(row, f'Violation{col + 1}'), f'VTipTex{col + 1}', None, None))
#
# self._columns = ColumnClass(_cols)
# if buildList:
# for col, obj in enumerate(buildList):
#
# # ids = pd.DataFrame({'#': [pk.serial for pk in buildList], 'Peak': [pk.pid for pk in buildList], '_object': [pk for pk in buildList], 'Expand': [None for pk in buildList]})
# # df1 = pd.DataFrame([(pk, res) for pk in buildList for res in pk.restraints if res.restraintTable == rl], columns=['Peak', 'Pid_1'])
# # rl1 = pd.merge(ids['Peak'], df1, how='right')
#
# if not obj.restraints or len(self._restraintTables) < 1:
# listItem = OrderedDict()
# for headerText, func in _buildColumns:
# listItem[headerText] = func(obj, None)
# allItems.append(listItem)
#
# else:
# _restraints = obj.restraints
# listItem = OrderedDict()
# for headerText, func in _buildColumns:
# listItem[headerText] = func(obj, None)
#
# _resLists = OrderedDict([(res, []) for res in self._restraintTables])
#
# # get the result from the dataSet.data
# # rl = self._restraintTables[0]; rl.structureData.data[0].dataParameters.get('results')
# #
# # rename the columns to match
# # viols.columns = [col+'_{ii+1}' for col in viols.columns]
# #
# # pd.merge(blank, viols, on=['Pid_1', 'Atoms_1'], how='left').fillna(0.0)
#
# for _res in _restraints:
# if _res and _res.restraintTable in _resLists:
# _atoms = self._getContributions(_res)
# for _atom in _atoms:
# _resLists[_res.restraintTable].append((_res, _atom, 0.0))
#
# for val in zip_longest(*_resLists.values()):
# copyItem = listItem.copy()
# for cc, rr in enumerate(val):
# copyItem[f'{HeaderRestraint}_{cc + 1}'] = rr[0].pid if rr else ''
# copyItem[f'{HeaderAtoms}_{cc + 1}'] = rr[1] if rr else ''
# # for _colID in (HeaderMin, HeaderMax, HeaderMean, HeaderStd, HeaderCount1, HeaderCount2):
# # copyItem[f'{_colID}_{cc + 1}'] = rr[2] if rr else 0
#
# allItems.append(copyItem)
#
# pass
#
# _dataFrame = DataFrameObject(dataFrame=pd.DataFrame(allItems, _columns=self._columns.headings),
# # objectList=objects or [],
# # indexList=indexList,
# columnDefs=self._columns or [],
# table=table,
# )
# _objects = [row for row in _dataFrame.dataFrame.itertuples()]
# _dataFrame._objects = _objects
#
# return _dataFrame
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~`
# print('Create violation dataFrames')
def _getContributions(restraint):
return [' - '.join(sorted(ri)) for rc in restraint.restraintContributions
for ri in rc.restraintItems]
# get the target peakLists
pks = buildList
resLists = self._restraintTables
if resLists:
# make references for quicker access later
contribs = {res: _getContributions(res) for rList in resLists for res in rList.restraints}
# make a dict of peak.restraints as this is reverse generated by the api every call to peak.restraints
# pkRes = {}
pkRestraints = {}
for resList in resLists:
for res in resList.restraints:
for pk in res.peaks:
pkRestraints.setdefault(pk.serial, set()).add(res)
# pkRes.setdefault(pk, {})
# pkRes[pk].setdefault(resList, set()).add(res)
# get the maximum number of restraintItems from each restraint list
counts = [np.array([sum([
len(contribs[res]) for res in (pkRestraints.get(pk.serial) or ()) if res and res.restraintTable == rList
])
for pk in pks])
for rList in resLists]
maxCount = np.max(counts, axis=0)
# print(f' max counts {list(maxCount)}')
# maxCount += 1
# allPks = pd.DataFrame([(pk.serial, pk, None) for pk, count in zip(pks, maxCount) for rr in range(count)], columns=['Peak', '_object', 'Expand'])
allPkSerials = pd.DataFrame([pk.serial for pk, count in zip(pks, maxCount) for rr in range(count)], columns=['PeakSerial', ])
index = pd.DataFrame([ii for ii in range(1, len(allPkSerials) + 1)], columns=['index', ])
allPks = pd.DataFrame([(pk.serial, pk.pid, self._downIcon) for pk, count in zip(pks, maxCount) for rr in range(count)], columns=['PeakSerial', '_object', 'Expand'])
# make matching length tables for each of the restraintTables for each peak so the rows match up in the table
dfs = {}
for lCount, rl in enumerate(resLists):
ll = [(None, None)] * sum(maxCount)
head = 0
for pk, cc, maxcc in zip(pks, counts[lCount], maxCount):
# ensure that the atoms are sorted so that they are matched correctly
_res = [(res.pid, ' - '.join(sorted(_atom.split(' - '), key=universalSortKey)) if _atom else None)
for res in (pkRestraints.get(pk.serial) or ()) if res.restraintTable == rl
for _atom in contribs[res]]
if _res:
ll[head:head + len(_res)] = _res
head += maxcc
# put the serial and atoms into another table to be concatenated to the right, lCount = index in resLists
dfs[rl] = pd.concat([allPkSerials,
pd.DataFrame(ll, columns=[f'{HeaderRestraint}_{lCount + 1}',
f'Atoms_{lCount + 1}'])], axis=1)
# # get the dataSets that contain data with a matching 'result' name - should be violations
# violationResults = {resList: viols.copy() if viols is not None else None
# for resList in resLists
# for data in resList.structureData.data if resList.name == data.name
# for k, viols in data.dataParameters.items() if k == 'results'}
# print(f' {self._outputTables}')
# get the dataSets that contain data with a matching 'result' name - should be violations
violationResults = {resList: viols.data.copy() if viols is not None else None
for resList in resLists
for viols in self._outputTables
if resList.pid == viols.getMetadata(_RESTRAINTTABLE) and viols.getMetadata(_VIOLATIONRESULT) is True
}
if violationResults:
# rename the columns to match the order in visible list - number must match the position in the selected restraintTables
for ii, (k, resViol) in enumerate(violationResults.items()):
ind = resLists.index(k)
# change old columns to new columns
newCols = [_OLDHEADERS.get(cc, None) or cc for cc in resViol.columns]
# resViol.columns = [vv + f'_{ind + 1}' for vv in resViol.columns]
resViol.columns = [vv + f'_{ind + 1}' for vv in newCols]
# merge all the tables for each restraintTable
_out = [index, allPks]
zeroCols = []
for ii, resList in enumerate(resLists):
# print(f' resList {ii} {resList}')
if resList in violationResults:
_left = dfs[resList]
_right = violationResults[resList]
if (f'{HeaderRestraint}_{ii + 1}' in _left.columns and f'Atoms_{ii + 1}' in _left.columns) and \
(f'{HeaderRestraint}_{ii + 1}' in _right.columns and f'Atoms_{ii + 1}' in _right.columns):
_new = pd.merge(_left, _right, on=[f'{HeaderRestraint}_{ii + 1}', f'Atoms_{ii + 1}'], how='left').drop(columns=['PeakSerial']).fillna(0.0)
_out.append(_new)
zeroCols.append(f'{HeaderMean}_{ii + 1}')
for _colID in (HeaderRestraint, HeaderAtoms,
HeaderTarget, HeaderLowerLimit, HeaderUpperLimit,
HeaderMin, HeaderMax, HeaderMean, HeaderStd,
HeaderCount1, HeaderCount2):
if f'{_colID}_{ii + 1}' in list(_right.columns):
# check whether all the columns exist - discard otherwise
# columns should have been renamed and post-fixed with _<num>. above
_cols.append((f'{_colID}_{ii + 1}', lambda row: _getValueByHeader(row, f'{_colID}_{ii + 1}'), f'{_colID}_Tip{ii + 1}', None, None))
else:
# lose the PeakSerial column for each
_new = dfs[resList].drop(columns=['PeakSerial']).fillna(0.0)
_out.append(_new)
# creat new column headings
for _colID in (HeaderRestraint, HeaderAtoms):
_cols.append((f'{_colID}_{ii + 1}', lambda row: _getValueByHeader(row, f'{_colID}_{ii + 1}'), f'{_colID}_Tip{ii + 1}', None, None))
# concatenate the final dataFrame
# _table = pd.concat([index, allPks, *_out.values()], axis=1)
_table = pd.concat(_out, axis=1)
# # purge all rows that contain all means == 0, fastest method
# _table = _table[np.count_nonzero(_table[zeroCols].values, axis=1) > 0]
# process all row that have means > 0.3, keep only rows that contain at least one valid mean
_table = _table[(_table[zeroCols] >= self._meanLowerLimit).sum(axis=1) > 0]
else:
# only show the restraints
_out = [index, allPks]
# no results - just show the table
for ii, resList in enumerate(resLists):
# print(f' resList {ii} {resList}')
# lose the PeakSerial column for each
_new = dfs[resList].drop(columns=['PeakSerial']).fillna(0.0)
_out.append(_new)
# creat new column headings
for _colID in (HeaderRestraint, HeaderAtoms):
_cols.append((f'{_colID}_{ii + 1}', lambda row: _getValueByHeader(row, f'{_colID}_{ii + 1}'), f'{_colID}_Tip{ii + 1}', None, None))
# concatenate to give the final table
_table = pd.concat(_out, axis=1)
else:
# make a table that only has peaks
index = pd.DataFrame([ii for ii in range(1, len(pks) + 1)], columns=['index'])
allPks = pd.DataFrame([(pk.serial, pk.pid, self._downIcon) for pk in pks], columns=['PeakSerial', '_object', 'Expand'])
_table = pd.concat([index, allPks], axis=1)
# set the table _columns
self._columns = ColumnClass(_cols)
# set the table from the dataFrame
_dataFrame = DataFrameObject(dataFrame=_table,
columnDefs=self._columns or [],
table=table,
)
# extract the row objects from the dataFrame
_objects = [row for row in _dataFrame.dataFrame.itertuples()]
_dataFrame._objects = _objects
return _dataFrame
[docs] def refreshTable(self):
# subclass to refresh the groups
self.setTableFromDataFrameObject(self._dataFrameObject)
self.updateTableExpanders()
[docs] def setDataFromSearchWidget(self, dataFrame):
"""Set the data for the table from the search widget
"""
self.setData(dataFrame.values)
self._updateGroups(dataFrame)
self.updateTableExpanders()
@staticmethod
def _getContributions(restraint):
"""
CCPN-INTERNAL: Return number of peaks assigned to NmrAtom in Experiments and PeakLists
using ChemicalShiftList
"""
return [' - '.join(ri) for rc in restraint.restraintContributions for ri in rc.restraintItems]
# if len(restraint.restraintContributions) > 0:
# if restraint.restraintContributions[0].restraintItems:
# # return restraint.restraintContributions[0].restraintItems[0]
# return [' - '.join(rr) for rr in restraint.restraintContributions[0].restraintItems]
# else:
# return ''
@staticmethod
def _getSortedContributions(restraint):
"""
CCPN-INTERNAL: Return number of peaks assigned to NmrAtom in Experiments and PeakLists
using ChemicalShiftList
"""
return [sorted(ri) for rc in restraint.restraintContributions for ri in rc.restraintItems if ri]
def _updateGroups(self, df):
self._groups = {}
# collate max/min information
for row in df.itertuples(index=False):
name = str(row[self.PIDCOLUMN])
if name not in self._groups:
_row = tuple(universalSortKey(x) for x in row)
self._groups[name] = {'min': _row, 'max': _row}
else:
self._groups[name]['min'] = tuple(map(lambda x, y: min(universalSortKey(x), y), row, self._groups[name]['min']))
self._groups[name]['max'] = tuple(map(lambda x, y: max(universalSortKey(x), y), row, self._groups[name]['max']))
[docs] def onSectionClicked(self, *args):
"""Respond to reordering the table
"""
self.updateTableExpanders()
[docs] def updateTableExpanders(self, expandState=None):
"""Update the state of the expander buttons
"""
if not isinstance(expandState, (bool, type(None))):
raise TypeError('expandState must be bool or None')
if self.EXPANDERCOLUMN >= self.columnCount():
return
rows = self.rowCount()
_order = [self.indexFromItem(self.item(ii, self.MERGECOLUMN)).data() for ii in range(rows)]
if not _order:
return
self.clearSpans()
row = rowCount = 0
lastRow = _order[row]
_expand = self._autoExpand if expandState is None else expandState
for i in range(0, rows):
nextRow = _order[i + 1] if i < (rows - 1) else None # catch the last group, otherwise need try/except
if lastRow == nextRow:
rowCount += 1
elif rowCount > 0:
for col in self.SPANCOLUMNS:
self.setSpan(row, col, rowCount + 1, 1)
self.setRowHidden(row, False)
_widg = self.cellWidget(row, self.EXPANDERCOLUMN)
_widg.updateCellWidget(row, True, setPixMapState=_expand)
for rr in range(row + 1, row + rowCount):
self.setRowHidden(rr, not _expand)
_widg = self.cellWidget(rr, self.EXPANDERCOLUMN)
_widg.updateCellWidget(rr, False)
self.setRowHidden(row + rowCount, not _expand)
_widg = self.cellWidget(row + rowCount, self.EXPANDERCOLUMN)
_widg.updateCellWidget(row + rowCount, False)
rowCount = 0
row = i + 1
else:
self.setRowHidden(i, False)
_widg = self.cellWidget(i, self.EXPANDERCOLUMN)
_widg.updateCellWidget(i, False)
row = i + 1
lastRow = nextRow
self.resizeRowsToContents()
def _showOnMolecularViewer(self):
pdbPath = None
selectedPeaks = self.getSelectedObjects() or []
## get the restraints to display
restraints = flattenLists([pk.restraints for pk in selectedPeaks])
## get the PDB file from the parent restraintTable.
for rs in restraints:
if rs.restraintTable.structureData.moleculeFilePath:
pdbPath = rs.restraintTable.structureData.moleculeFilePath
getLogger().info('Using pdb file %s for displaying violation on Molecular viewer.' % pdbPath)
break
## run Pymol
pymolScriptPath = joinPath(self.moduleParent.pymolScriptsPath, PymolScriptName)
if pdbPath is None:
MessageDialog.showWarning('No Molecule File found',
'''To add a molecule file path: Find the StructureData on sideBar,
open the properties popup, add a full PDB file path in the entry widget.''')
return
pymolScriptPath = pyMolUtil._restraintsSelection2PyMolFile(pymolScriptPath, pdbPath, restraints)
pyMolUtil.runPymolWithScript(self.application, pymolScriptPath)
[docs]def main():
# show the empty module
from ccpn.ui.gui.widgets.Application import newTestApplication
from ccpn.framework.Application import getApplication
# create a new test application
app = newTestApplication(interface='Gui')
application = getApplication()
mainWindow = application.ui.mainWindow
# add the module to mainWindow
_module = RestraintAnalysisTableModule(mainWindow=mainWindow)
mainWindow.moduleArea.addModule(_module)
# show the mainWindow
app.start()
if __name__ == '__main__':
main()