-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLoopSage_md.py
164 lines (145 loc) · 7.49 KB
/
LoopSage_md.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#########################################################################
########### CREATOR: SEBASTIAN KORSAK, WARSAW 2022 ######################
#########################################################################
import copy
import time
import numpy as np
import openmm as mm
import openmm.unit as u
from tqdm import tqdm
from sys import stdout
from mdtraj.reporters import HDF5Reporter
from scipy import ndimage
from openmm.app import PDBFile, PDBxFile, ForceField, Simulation, PDBReporter, PDBxReporter, DCDReporter, StateDataReporter, CharmmPsfFile
from LoopSage_utils import *
class MD_LE:
def __init__(self,M,N,N_beads,burnin,MC_step,path,platform):
'''
M, N (np arrays): Position matrix of two legs of cohesin m,n.
Rows represent loops/cohesins and columns represent time
N_beads (int): The number of beads of initial structure.
step (int): sampling rate
path (int): the path where the simulation will save structures etc.
'''
self.M, self.N = M, N
self.N_coh, self.N_steps = M.shape
self.N_beads, self.step, self.burnin = N_beads, MC_step, burnin//MC_step
self.path = path
self.platform = platform
def run_pipeline(self,run_MD=True,sim_step=5,write_files=False,plots=False):
'''
This is the basic function that runs the molecular simulation pipeline.
Input parameters:
run_MD (bool): True if user wants to run molecular simulation (not only energy minimization).
sim_step (int): the simulation step of Langevin integrator.
write_files (bool): True if the user wants to save the structures that determine the simulation ensemble.
plots (bool): True if the user wants to see the output average heatmaps.
'''
# Define initial structure
print('Building initial structure...')
points = self_avoiding_random_walk(self.N_beads, 2, 0.001)
write_mmcif(points,self.path+'/LE_init_struct.cif')
generate_psf(self.N_beads,self.path+'/other/LE_init_struct.psf')
print('Done brother ;D\n')
# Define System
pdb = PDBxFile(self.path+'/LE_init_struct.cif')
forcefield = ForceField('forcefields/classic_sm_ff.xml')
self.system = forcefield.createSystem(pdb.topology, nonbondedCutoff=1*u.nanometer)
integrator = mm.LangevinIntegrator(310, 0.05, 100 * mm.unit.femtosecond)
# Add forces
print('Adding forces...')
self.add_forcefield()
print('Forces added ;)\n')
# Minimize energy
print('Minimizing energy...')
platform = mm.Platform.getPlatformByName(self.platform)
self.simulation = Simulation(pdb.topology, self.system, integrator, platform)
self.simulation.reporters.append(StateDataReporter(stdout, (self.N_steps*sim_step)//10, step=True, totalEnergy=True, potentialEnergy=True, temperature=True))
self.simulation.reporters.append(DCDReporter(self.path+'/other/stochastic_LE.dcd', 5))
self.simulation.context.setPositions(pdb.positions)
current_platform = self.simulation.context.getPlatform()
print(f"Simulation will run on platform: {current_platform.getName()}")
self.simulation.minimizeEnergy()
print('Energy minimization done :D\n')
# Run molecular dynamics simulation
if run_MD:
print('Running molecular dynamics (wait for 10 steps)...')
start = time.time()
heats = list()
for i in range(1,self.N_steps):
self.change_loop(i)
self.simulation.step(sim_step)
if i%self.step==0 and i>self.burnin*self.step:
self.state = self.simulation.context.getState(getPositions=True)
if write_files: PDBxFile.writeFile(pdb.topology, self.state.getPositions(), open(self.path+f'/pdbs/MDLE_{i//self.step-self.burnin}.cif', 'w'))
save_path = self.path+f'/heatmaps/heat_{i//self.step-self.burnin}.svg' if write_files else None
heats.append(get_heatmap(self.state.getPositions(),save_path=save_path,save=write_files))
time.sleep(5)
end = time.time()
elapsed = end - start
print(f'Everything is done! Simulation finished succesfully!\nMD finished in {elapsed/60:.2f} minutes.\n')
self.avg_heat = np.average(heats,axis=0)
self.std_heat = np.std(heats,axis=0)
np.save(self.path+f'/other/avg_heatmap.npy',self.avg_heat)
np.save(self.path+f'/other/std_heatmap.npy',self.std_heat)
if plots:
self.plot_heat(self.avg_heat,f'/plots/avg_heatmap.svg')
self.plot_heat(self.std_heat,f'/plots/std_heatmap.svg')
return self.avg_heat
def change_loop(self,i):
force_idx = self.system.getNumForces()-1
self.system.removeForce(force_idx)
self.add_loops(i)
self.simulation.context.reinitialize(preserveState=True)
self.LE_force.updateParametersInContext(self.simulation.context)
def add_evforce(self):
'Leonard-Jones potential for excluded volume'
self.ev_force = mm.CustomNonbondedForce('epsilon*((sigma1+sigma2)/(r+r_small))')
self.ev_force.addGlobalParameter('epsilon', defaultValue=10)
self.ev_force.addGlobalParameter('r_small', defaultValue=0.01)
self.ev_force.addPerParticleParameter('sigma')
for i in range(self.N_beads):
self.ev_force.addParticle([0.05])
self.system.addForce(self.ev_force)
def add_bonds(self):
'Harmonic bond borce between succesive beads'
self.bond_force = mm.HarmonicBondForce()
for i in range(self.N_beads - 1):
self.bond_force.addBond(i, i + 1, 0.1, 3e5)
self.system.addForce(self.bond_force)
def add_stiffness(self):
'Harmonic angle force between successive beads so as to make chromatin rigid'
self.angle_force = mm.HarmonicAngleForce()
for i in range(self.N_beads - 2):
self.angle_force.addAngle(i, i + 1, i + 2, np.pi, 200)
self.system.addForce(self.angle_force)
def add_loops(self,i=0):
'LE force that connects cohesin restraints'
self.LE_force = mm.HarmonicBondForce()
for nn in range(self.N_coh):
self.LE_force.addBond(self.M[nn,i], self.N[nn,i], 0.1, 5e4)
self.system.addForce(self.LE_force)
def add_forcefield(self):
'''
Here is the definition of the forcefield.
There are the following energies:
- ev force: repelling LJ-like forcefield
- harmonic bond force: to connect adjacent beads.
- angle force: for polymer stiffness.
- loop forces: this is a list of force objects. Each object corresponds to a different cohesin. It is needed to define a force for each time step.
'''
self.add_evforce()
self.add_bonds()
self.add_stiffness()
self.add_loops()
def plot_heat(self,img,file_name):
figure(figsize=(10, 10))
plt.imshow(img,cmap="Reds",vmax=1)
plt.savefig(self.path+file_name,format='svg',dpi=500)
plt.close()
def main():
# A potential example
M = np.load('/home/skorsak/Dropbox/LoopSage/files/region_[48100000,48700000]_chr3/Annealing_Nbeads500_ncoh50/Ms.npy')
N = np.load('/home/skorsak/Dropbox/LoopSage/files/region_[48100000,48700000]_chr3/Annealing_Nbeads500_ncoh50/Ns.npy')
md = MD_LE(4*M,4*N,2000,5,1)
md.run_pipeline(write_files=False,plots=True,sim_step=100)