Source code for ccpn.AnalysisScreen.lib.SimulatedAnnealing

#  Part of the Simulated Annealing algorithm is inspired by NMRmix.
#  As pubblished in J.L. Stark, et al. NMRmix: a tool for the optimization of compound mixtures in 1D (1)H NMR ligand affinity screens
#  J Proteome Res, 15 (2016), pp. 1360–1368

import math
import random
import copy
import pandas as pd
import numpy as np
import datetime
from itertools import combinations
from numba import jit
from ccpn.util.Logging import getLogger

[docs]def randomDictMixtures(name, compounds, nMixtures): mixturesDict = {} n,b,mixtures = len(compounds),0,[] compounds = list(compounds) compounds = random.sample(compounds, len(compounds)) for k in range(nMixtures): a, b = b, b + (n+k)//nMixtures key = str(name)+'-'+str(k+1) value = compounds[a:b] mixturesDict.update({key:value}) return mixturesDict
def _getScore(overlapCount, compoundPeaksCount, scalingFactor=1): try: return (scalingFactor * (overlapCount / compoundPeaksCount)) except ZeroDivisionError: return 0 def _getOverlapCount(compoundA, compoundB, minDist): "for refactoring" a, b = np.array(compoundA), np.array(compoundB) c = abs(b[:, None] - a) d = np.argwhere(c < minDist) overlapCount = d.shape[0] return overlapCount
[docs]def scoreCompound(compoundA, compoundB, minDist, scalingFactor=1): overlapCount = _getOverlapCount(compoundA, compoundB, minDist) compoundPeaksCount = len(compoundB) score = _getScore(overlapCount, compoundPeaksCount, scalingFactor) return score
[docs]def scoreCompound_OLD(compoundA, compoundB, minDist, scalingFactor=1): """ OLD slow keep for backup""" overlapCount = 0 for shiftA in compoundA: for shiftB in compoundB: if abs(shiftA - shiftB) < minDist: overlapCount += 1 continue try: return(scalingFactor * (overlapCount / len(compoundA))) except ZeroDivisionError: return 0
[docs]def scoreMixture_(mixture, minDist): "for refactoring" l = list(combinations(mixture, 2)) score = np.sum([scoreCompound(i[0][1],i[1][1], minDist=minDist) for i in l]) return score
[docs]def scoreMixture(mixture, minDist): score = 0 for compoundA in mixture: for compoundB in mixture: if compoundA[0] is not compoundB[0]: score += scoreCompound(compoundA[1], compoundB[1], minDist=minDist) return score
[docs]def calculateTotalScore(mixturesDict, peaksDistance=0.01): score = 0 if mixturesDict is None: return 0 for mixture in mixturesDict: score += scoreMixture(mixturesDict[mixture], peaksDistance) return score
# def getAllOverlappedPositions(mixtures, minDist): # # for mixtureName, compounds in mixtures.items(): # for compound in compounds: # compoundName, compoundPeakList = compound # compoundsToCompare = [c[1] for c in compounds if c[0] != compoundName] # overlaped = calculateOverlapCount(compoundPeakList, compoundsToCompare, minDist ) # if overlaped is None: # getLogger().info(compoundName, 'No Overlapped peaks found') # else: # score = len(overlaped) / len(compoundPeakList) # # getLogger().info(compoundName, ' --> Counts',len(list(set(overlaped))), ' --> Overlapped positions:', overlaped, 'score: -->', score)
[docs]def getOverlappedCount(mixtureCompounds, minDist=0.01): "called from score single sample" # TODO get rid of . Duplicated routine, totalOverlapped = 0 overlaps = [] for compound in mixtureCompounds: compoundName, compoundPeakList = compound compoundsToCompare = [c[1] for c in mixtureCompounds if c[0] != compoundName] overlaped = calculateOverlapCount(compoundPeakList, compoundsToCompare, minDist) if overlaped is None: # getLogger().info(compoundName, 'No Overlapped peaks found') continue else: totalOverlapped += len(list(set(overlaped))) overlaps.append(list(set(overlaped))) # getLogger().info('Component %s, overlaps at positions: %s ' %(compoundName, list(set(overlaped)))) totalOverlapped = len(list(set([o for j in overlaps for o in j]))) # getLogger().info('Total Overlaps for Mixture ', totalOverlapped) return totalOverlapped
[docs]def calculateOverlapCount(compoundA, mixture, minimalOverlap): "called from score single sampleComponent and sample" # TODO get rid of . Duplicated routine, ll = [] for compound in mixture: overlaped = [peakA for peak in compound for peakA in compoundA if abs(peak - peakA) <= minimalOverlap] if len(overlaped)>0: ll.append(overlaped) # return overlaped return [j for l in ll for j in l]
[docs]def findBestMixtures(mixturesSteps): bestMixturesStep = list(mixturesSteps.items()) if len(bestMixturesStep) > 0: bestMixtures = min(bestMixturesStep)[1] return bestMixtures
[docs]def mixTwoMixturesDict(mixtures): mixturesList = list(mixtures.items()) sampledMixtures = random.sample(mixturesList, 2) mixturesToMix = dict(sampledMixtures) swaps = [] for i, mixture in enumerate(list(mixturesToMix.values())): randInt = random.randint(0, len((mixture)[i]) - 1) pick = list(mixturesToMix.values())[i][randInt] list(mixturesToMix.values())[i].remove(pick) swaps.append(pick) for i, mixture in enumerate(list(mixturesToMix.values())): pick = swaps.pop() list(mixturesToMix.values())[i].append(pick) for m in sampledMixtures: mixturesList.remove(m) mixedMixtures = mixturesList + list(mixturesToMix.items()) return dict(mixedMixtures)
[docs]@jit(nopython=True) def getLinearSteps(startTemp=1000.0, finalTemp = 0.1, maxSteps = 1000): tSteps = (startTemp - finalTemp) / maxSteps # 1) temp = startTemp while True: yield temp temp -= tSteps
[docs]@jit(nopython=True) def getExponentialSteps(startTemp=1000.0, finalTemp = 0.1, maxSteps = 100): aTemp = math.exp((math.log(finalTemp / startTemp)) / maxSteps) temp = startTemp while True: yield temp temp *= aTemp
[docs]def runCooling(type, startTemp=1000.0, finalTemp = 0.1, maxSteps = 1000): if type == 'exponential': coolingSchedule = getExponentialSteps(startTemp, finalTemp, maxSteps) elif type == 'Linear': coolingSchedule = getLinearSteps(startTemp, finalTemp, maxSteps) else: getLogger().info('Cooling type not implemented yet, Used linear instead') coolingSchedule = getLinearSteps() return coolingSchedule
[docs]@jit(nopython=True) def getProbability(scoreDiff, currentTemp, tempK): if currentTemp == 0: return 0 k = tempK probabilty = math.exp(-(scoreDiff)*k / ( currentTemp)) return probabilty
from tqdm import tqdm
[docs]def annealling(mixtures, coolingMethod='Linear', startTemp=1000, finalTemp=0.01, maxSteps=1000, tempK=200, minDistance=0.01): mixturesSteps = {} coolingSchedule = runCooling(coolingMethod, startTemp, finalTemp, maxSteps) currScore = calculateTotalScore(mixtures,minDistance) scores = [] probabilities = [] step = 1 for currentTemp in tqdm(coolingSchedule): newMixtures = mixTwoMixturesDict(mixtures) copyNewMixtures = copy.deepcopy(newMixtures) newScore = calculateTotalScore(newMixtures,minDistance) # getLogger().info('Annealing .... : T%s, Score %s' %(currentTemp, newScore,) ) scoreDiff = abs(newScore - currScore) scores.append(newScore) if newScore == 0: return copyNewMixtures elif newScore <= currScore: mixtures = newMixtures currScore = newScore mixturesSteps.update({newScore: copyNewMixtures}) else: probability = getProbability(scoreDiff,currentTemp, tempK) probabilities.append(probability) if random.random() < probability: mixtures = newMixtures currScore = newScore mixturesSteps.update({newScore: copyNewMixtures}) step += 1 if step > maxSteps: break if len(scores)>0: bestScore = min(scores) # getLogger().info('--> bestScore anniling:',bestScore) bestMixtures = findBestMixtures(mixturesSteps) return bestMixtures
[docs]def iterateAnnealing(mixtures, startTemp=1000, finalTemp=0.01, maxSteps=1000, tempK=200, coolingMethod='linear', nIterations=1, minDistance=0.01, minTotalScore = None): getLogger().info('Starting Annealing Iterations ') totalScores = [] bestIteration = {} #score:mixture startingScore = calculateTotalScore(mixtures, minDistance) bestIteration.update({startingScore: mixtures}) if minTotalScore is None: minTotalScore = startingScore # getLogger().info('startingScore', startingScore) copyMixtures = copy.deepcopy(mixtures) if startingScore == 0: return mixtures i = 0 # while i <= nIterations: # for i in range(0,nIterations): newMixtures = annealling(mixtures, coolingMethod=coolingMethod, startTemp=startTemp, finalTemp=finalTemp, maxSteps=maxSteps, tempK=tempK, minDistance=minDistance) copyNewMixtures = copy.deepcopy(newMixtures) if newMixtures: currScore = calculateTotalScore(newMixtures,minDistance) bestIteration.update({currScore: newMixtures}) totalScores.append(currScore) if currScore == 0: return copyNewMixtures j = 0 while currScore >= startingScore or j <= nIterations: getLogger().info('Iteration...:',j) if j == nIterations: getLogger().info('Reached max n of iterations...', j) break newMixtures = annealling(mixtures, coolingMethod=coolingMethod, startTemp=startTemp, finalTemp=finalTemp, maxSteps=maxSteps, tempK=tempK, minDistance=minDistance) currScore = calculateTotalScore(newMixtures, minDistance) totalScores.append(currScore) if currScore <= startingScore: getLogger().info('Iteration: %s . Current score: %s. Starting Score: %s ' %(j, currScore, startingScore)) bestIteration.update({currScore: newMixtures}) startingScore = currScore j += 1 getLogger().info('currScore %s, minTotalScore: %s, j: %s, nIterations:%s '%(currScore, minTotalScore, j, nIterations)) df = pd.DataFrame(totalScores) dt = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S").replace(' ','_') # df.to_excel('/Users/luca/Desktop/PhD/data/UCB/results/mixtures/allScores__'+dt+'.xlsx') if len(bestIteration)>0: getLogger().info("Annealing Finished") bestMixtures = findBestMixtures(bestIteration) return bestMixtures else: getLogger().info('No Better Iteration found, original mixtures are returned') # sc = calculateTotalScore(copyMixtures, minDistance) return copyMixtures
def _showScoresPerMixture(mixtures, minDistance): scoring = [] for mixtureName, mixCompounds in mixtures.items(): scoring.append(str(mixtureName)+ ' score: ' + str(scoreMixture(mixCompounds, minDistance))) return scoring
[docs]def greedyMixtures(compounds, maxSize, minDistance, maxPeaksOverlapped=None): mixtures = [[]] * len(compounds) for compound in compounds: compoundMixtured = False mixtureCount = 0 while not compoundMixtured: if len(mixtures[mixtureCount]) < maxSize: mixtureScore = scoreMixture(mixtures[mixtureCount], minDistance) trialMixture = mixtures[mixtureCount] + [compound, ] if scoreMixture(trialMixture, minDistance) == mixtureScore: mixtures[mixtureCount] = trialMixture compoundMixtured = True mixtureCount += 1 return [mixture for mixture in mixtures if len(mixture)>0]
############ # Not used. to be removed # def _calculateSingleCompoundScore(compoundA, mixture, minimalOverlap): # for mix, compounds in mixture.items(): # for compound in compounds: # overlaped = [peakA for peak in compound[1] for peakA in compoundA[1] if abs(peak - peakA) <= minimalOverlap] # scoring = len(overlaped) / len(compoundA[1]) # def getMixtureInfo(mixture, minDist): # # for compound in mixture: # compoundName, compoundPeakList = compound # compoundsToCompare = [c[1] for c in mixture if c[0] != compoundName] # overlaped = calculateOverlapCount(compoundPeakList, compoundsToCompare, minDist ) # # if overlaped is None: # getLogger().info(compoundName, 'No Overlapped peaks found') # else: # score = len(overlaped) / len(compoundPeakList) # getLogger().info(compoundName, ' --> Counts',len(list(set(overlaped))), ' --> Overlapped positions:', overlaped, 'score: -->', score) # def getAllOverlappedPositions(mixtures, minDist): # # for mixtureName, compounds in mixtures.items(): # for compound in compounds: # compoundName, compoundPeakList = compound # compoundsToCompare = [c[1] for c in compounds if c[0] != compoundName] # overlaped = calculateOverlapCount(compoundPeakList, compoundsToCompare, minDist ) # if overlaped is None: # getLogger().info(compoundName, 'No Overlapped peaks found') # else: # score = len(overlaped) / len(compoundPeakList) # # getLogger().info(compoundName, ' --> Counts',len(list(set(overlaped))), ' --> Overlapped positions:', overlaped, 'score: -->', score) # # def generateSimpleCompounds(numb, numberOfPeaks, ppmStart, ppmEnd): # compounds = [] # for i in range(numb): # compound = generatePeakPositions(numberOfPeaks, ppmStart, ppmEnd) # compounds.append(compound) # return compounds # def generatePeakPositions(numberOfPeaks, ppmStart, ppmEnd): # peakPositions = [] # for x in range(numberOfPeaks): # peakPositions.append(random.uniform(ppmStart, ppmEnd)) # return sorted(peakPositions)