Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 47 pleafletfinder #69

Merged
merged 21 commits into from
Oct 17, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 282 additions & 0 deletions pmda/leaflet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# PMDA
# Copyright (c) 2018 The MDAnalysis Development Team and contributors
# (see the file AUTHORS for the full list of names)
#
# Released under the GNU Public Licence, v2 or any higher version
"""
LeafletFInder Analysis tool --- :mod:`pmda.leaflet`
==========================================================

This module contains parallel versions of analysis tasks in
:mod:`MDAnalysis.analysis.leaflet`.

.. autoclass:: LeafletFinder
:members:
:undoc-members:
:inherited-members:

"""
from __future__ import absolute_import

import numpy as np
import dask.bag as db
import networkx as nx
iparask marked this conversation as resolved.
Show resolved Hide resolved
from scipy.spatial import cKDTree

import MDAnalysis as mda
from dask import distributed, multiprocessing
from joblib import cpu_count

from .parallel import ParallelAnalysisBase, Timing
from .util import timeit


class LeafletFinder(ParallelAnalysisBase):
"""Parallel Leaflet Finder analysis.

Identify atoms in the same leaflet of a lipid bilayer.
This class implements and parallelizes the *LeafletFinder* algorithm
[Michaud-Agrawal2011]_.
The parallelization is done based on:

"Task-parallel Analysis of Molecular Dynamics Trajectories",
iparask marked this conversation as resolved.
Show resolved Hide resolved
Ioannis Paraskevakos, Andre Luckow, Mahzad Khoshlessan, George
Chantzialexiou, Thomas E. Cheatham, Oliver Beckstein, Geoffrey C. Fox and
Shantenu Jha
47th International Conference on Parallel Processing (ICPP 2018)

Attributes
----------

Parameters
----------
Universe : :class:`~MDAnalysis.core.groups.Universe`
a :class:`MDAnalysis.core.groups.Universe` (the
`atomgroup` must belong to this Universe)
atomgroup : tuple of :class:`~MDAnalysis.core.groups.AtomGroup`
atomgroups that are iterated in parallel

Note
----
At the moment, this class has far fewer features than the serial
version :class:`MDAnalysis.analysis.leaflet.LeafletFinder`.
iparask marked this conversation as resolved.
Show resolved Hide resolved

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional notes to add:

  • Currently, periodic boundaries are not taken into account.
  • The calculation is parallelized on a per-frame basis [Paraskevakos2018]_; at the moment, no parallelization over trajectory blocks is performed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"""

def __init__(self, universe, atomgroup):
"""Parameters
iparask marked this conversation as resolved.
Show resolved Hide resolved
----------
Universe : :class:`~MDAnalysis.core.groups.Universe`
a :class:`MDAnalysis.core.groups.Universe` (the
`atomgroup` must belong to this Universe)

atomgroup : tuple of :class:`~MDAnalysis.core.groups.AtomGroup`
atomgroup that are iterated in parallel

Attributes
----------

"""
#super(ParallelAnalysisBase, self).__init__()
self._trajectory = universe.trajectory
self._top = universe.filename
self._traj = universe.trajectory.filename
self._atomgroup = atomgroup

def _find_parcc(self, data, cutoff=15.0):
orbeckst marked this conversation as resolved.
Show resolved Hide resolved
window, index = data[0]
num = window[0].shape[0]
i_index = index[0]
j_index = index[1]
graph = nx.Graph()
if i_index == j_index:
train = window[0]
test = window[1]
else:
train = np.vstack([window[0], window[1]])
test = np.vstack([window[0], window[1]])
tree = cKDTree(train, leafsize=40)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment here that this does not respect PBC; possible TODO: work with capped_distances() instead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by PBC?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://en.wikipedia.org/wiki/Periodic_boundary_conditions

like Pacman geometry. Particles are simulated within a finite box of size [Lx, Ly, Lz], Particles near x=0 and x=Lx are actually very close to each other.

So the leaflet is actually an infinite sheet and has no "edge". Without considering periodic boundaries you'll identify the leaflets, but you'll miss some of the edges in the graph representation, which might be important for some analysis

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see. May I suggest the following. Have a working implementation without taking into account PBCs and without breaking the trajectory into multiple blocks. As soon as that is done and this PR can be accepted, merge it so that we do not miss the release this is intended for.

Next version of the implementation will take into account PBCs and also try to see how to parallelize over trajectory blocks.

@orbeckst , @richardjgowers do you agree with that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok with an initial version that does not do PBC if

  • the docs mention this
  • we have an issue open for it

Breaking trajectory into blocks can also wait in my opinion.

edges = tree.query_ball_point(test, cutoff)
edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list))
for idx, dest_list in enumerate(edges)]

edge_list_flat = np.array([list(item) for sublist in edge_list for
item in sublist])
if i_index == j_index:
res = edge_list_flat.transpose()
res[0] = res[0] + i_index - 1
res[1] = res[1] + j_index - 1
else:
removed_elements = list()
for i in range(edge_list_flat.shape[0]):
if (edge_list_flat[i, 0] >= 0 and
edge_list_flat[i, 0] <= num - 1) and \
(edge_list_flat[i, 1] >= 0 and
edge_list_flat[i, 1] <= num - 1) or \
(edge_list_flat[i, 0] >= num and
edge_list_flat[i, 0] <= 2 * num - 1) and \
(edge_list_flat[i, 1] >= num and
edge_list_flat[i, 1] <= 2 * num - 1):
removed_elements.append(i)
res = np.delete(edge_list_flat, removed_elements,
axis=0).transpose()
res[0] = res[0] + i_index - 1
res[1] = res[1] - num + j_index - 1

edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])]
graph.add_edges_from(edges)

# partial connected components

subgraphs = nx.connected_components(graph)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc networkx is wholly Python

This function in MDAnalysis:

https://github.com/MDAnalysis/mdanalysis/blob/develop/package/MDAnalysis/lib/_cutil.pyx#L322

Is a c++ function which does the same, pass it node and edge indices and it will return a list of the indices in each subgraph

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand your comment correctly, you are suggesting to change from netwrokx to the function MDAnalysis implements, right?

When I started the implementation of the Leaflet Finder for my paper I used the class that exists in MDAnalysis (link). There networkx is being used. In addition, I was very cautious when I built my version to use highly used python packages for things I did not want to reimplement.

Did you reimplement a Connected component calculator or did that pre-exist Leaflet Finder?

Also, I am extremely bad with abbreviations and I have to google them all the time. Can you expand iirc?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, iirc = if I remember correctly.

Yeah I think the MDAnalysis function will be faster. It was added after the original leaflet finder class.

If you want to keep this the same for a closer comparison to the original class that's fine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richardjgowers did you benchmark find_fragments() against networkx?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Also, we should update the serial LeafletFinder with capped_distances and find_fragments...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we use networkx for the initial version because this is what was described in the paper.

We can then open an new issue to optimize parallel leafletfinder and add PBC.

comp = [g for g in subgraphs]
return comp

def _single_frame(self, scheduler_kwargs, n_blocks, cutoff=15.0):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why n_blocks – it's not used at the moment. Omit?

(Unless you immediately want to parallelize over blocks, too)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parallelization happens in a single frame. blocks specify the number of partitioning the data over.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove it and change the way I used it in the code

"""Perform computation on a single trajectory frame.

Must return computed values as a list. You can only **read**
from member variables stored in ``self``. Changing them during
a run will result in undefined behavior. `ts` and any of the
atomgroups can be changed (but changes will be overwritten
when the next time step is read).

Parameters
----------
atomgroups : tuple
Tuple of :class:`~MDAnalysis.core.groups.AtomGroup`
instances that are updated to the current frame.
scheduler_kwargs : Dask Scheduler parameters.
cutoff : float (optional)
head group-defining atoms within a distance of `cutoff`
Angstroms are deemed to be in the same leaflet [15.0]

Returns
-------
values : anything
The output from the computation over a single frame must
be returned. The `value` will be added to a list for each
block and the list of blocks is stored as :attr:`_results`
before :meth:`_conclude` is run. In order to simplify
processing, the `values` should be "simple" shallow data
structures such as arrays or lists of numbers.

"""

# Get positions of the atoms in the atomgroup and find their number.
atoms = self._atomgroup.positions
matrix_size = atoms.shape[0]
arraged_coord = list()
part_size = int(matrix_size / n_blocks)
# Partition the data based on a 2-dimensional partitioning
for i in range(1, matrix_size + 1, part_size):
for j in range(1, matrix_size + 1, part_size):
arraged_coord.append(([atoms[i - 1:i - 1 + part_size],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this correctly, you are partitioning a (M, M) matrix A into sub-blocks. A_{ij} will represent a contact.

Because we evaluate the atomgroup with itself, this is symmetric, A_ij = A_ji. Couldn't you save on the order of M^2/2 evaluations by only taking the diagonal and lower triangle blocks?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me give it a try and see what is happening with the tests. This is an exact transport from the code I used for the paper. I will fix it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you have a look at optimizing this?

If it should be optimized, add it to the #72 issue.

For this PR we can put the code as it was for the paper.

atoms[j - 1:j - 1 + part_size]],
[i, j]))

# Distribute the data over the available cores, apply the map function
# and execute.
parAtoms = db.from_sequence(arraged_coord,
npartitions=len(arraged_coord))
parAtomsMap = parAtoms.map_partitions(self._find_parcc)
Components = parAtomsMap.compute(**scheduler_kwargs)

# Gather the results and start the reduction. TODO: think if it can go
# to the private _reduce method of the based class.
result = list(Components)
# Create the overall connected components of the graph
while len(result) != 0:
item1 = result[0]
result.pop(0)
ind = []
for i, item2 in enumerate(Components):
if item1.intersection(item2):
item1 = item1.union(item2)
ind.append(i)
ind.reverse()
[Components.pop(j) for j in ind]
Components.append(item1)

# Change output for and return.
indices = [np.sort(list(g)) for g in Components]
return indices

def run(self,
start=None,
stop=None,
step=None,
scheduler=None,
n_jobs=1,
n_blocks=None,
cutoff=15.0):
"""Perform the calculation

Parameters
----------
start : int, optional
start frame of analysis
stop : int, optional
stop frame of analysis
step : int, optional
number of frames to skip between each analysed frame
scheduler : dask scheduler, optional
Use dask scheduler, defaults to multiprocessing. This can be used
to spread work to a distributed scheduler
n_jobs : int, optional
number of tasks to start, if `-1` use number of logical cpu cores.
This argument will be ignored when the distributed scheduler is
used
n_blocks : int, optional
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment that this is currently ignore: no parallelization over trajectory blocks. Or are you already working on adding the parallelization over frames, too?

number of partitions to divide trajectory frame into. If ``None``
set equal to sqrt(n_jobs) or number of available workers in
scheduler.

"""
if scheduler is None:
scheduler = multiprocessing

if n_jobs == -1:
n_jobs = cpu_count()

if n_blocks is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

COuld be removed if n_blocks not used.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is being used to produce the correct number of tasks. Because it is optional, if it is not set we set it manually based on the number of jobs. Now that I think about it n_jobs is not really needed since I parallelize based on the number of blocks.

if scheduler == multiprocessing:
n_blocks = n_jobs
elif isinstance(scheduler, distributed.Client):
n_blocks = len(scheduler.ncores())
else:
raise ValueError(
"Couldn't guess ideal number of blocks from scheduler."
"Please provide `n_blocks` in call to method.")

universe = mda.Universe(self._top, self._traj)
scheduler_kwargs = {'get': scheduler.get}
if scheduler == multiprocessing:
scheduler_kwargs['num_workers'] = n_jobs

start, stop, step = self._trajectory.check_slice_indices(
orbeckst marked this conversation as resolved.
Show resolved Hide resolved
start, stop, step)
n_frames = len(range(start, stop, step))
with timeit() as total:
frames = []
with self.readonly_attributes():
for frame in range(start, stop, step):
leaflet = self._single_frame(scheduler_kwargs=scheduler_kwargs,
n_blocks=n_blocks,
cutoff=cutoff)
frames.append(leaflet[0:2])
self._results = frames
with timeit() as conclude:
self._conclude()
# TODO: Fix timinns
# self.timing = Timing(
# np.hstack([el[1] for el in res]),
# np.hstack([el[2] for el in res]), total.elapsed,
# np.array([el[3] for el in res]), time_prepare, conclude.elapsed)
return self

def _conclude(self):
self.results = [self._atomgroup[i] for i in self._results]
46 changes: 46 additions & 0 deletions pmda/test/test_leaflet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from __future__ import absolute_import, division, print_function

import pytest
from numpy.testing import (assert_almost_equal, assert_array_equal,
assert_array_almost_equal)

import MDAnalysis
from MDAnalysisTests.datafiles import Martini_membrane_gro
from MDAnalysisTests.datafiles import GRO_MEMPROT, XTC_MEMPROT
from pmda import leaflet


class TestLeafLet(object):

@pytest.fixture()
def u_one_frame(self):
return MDAnalysis.Universe(Martini_membrane_gro)

@pytest.fixture()
def universe(self):
return MDAnalysis.Universe(Martini_membrane_gro)

@pytest.fixture()
def correct_values(self):
pass

@pytest.fixture()
def correct_values_single_frame(self):
return [range(1, 2150, 12), range(2521, 4670, 12)]

def test_leaflet(self, universe):
pass

def test_leaflet_step(self, universe, correct_values):
pass

def test_leaflet_single_frame(self,
u_one_frame,
correct_values_single_frame):
lipid_heads = u_one_frame.select_atoms("name PO4")
u_one_frame.trajectory.rewind()
leaflets = leaflet.LeafletFinder(u_one_frame, lipid_heads).run(start=0,
stop=1)
assert_almost_equal(leaflets.results[0].indices,
correct_values_single_frame, err_msg="error: " +
"leaflets should match test values")
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@
'MDAnalysis>=0.18',
'dask',
'six',
'joblib', # cpu_count func currently
'joblib',
'networkx',
'scipy', # cpu_count func currently
orbeckst marked this conversation as resolved.
Show resolved Hide resolved
],
tests_require=[
'pytest',
Expand Down