Skip to content

Commit

Permalink
add computations in distributed mode (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
jli05 committed May 6, 2017
1 parent 45e736c commit edecb43
Show file tree
Hide file tree
Showing 5 changed files with 583 additions and 0 deletions.
180 changes: 180 additions & 0 deletions cumulants_dist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
''' Cumulants computations in distributed mode
'''
from itertools import islice
import numpy as np
from cumulants import (equal_partitions, contrib_m1, contrib_prod_e2_x,
contrib_whiten_e3, contrib_whiten_e2m1)
import mxnet as mx
from partitioned_data import pload

KVSTORE = mx.kvstore.create('dist')
KEY_M1 = 100
KEY_PROD_M2X = 110
KEY_WHITENED_E3 = 120
KEY_WHITENED_E2M1 = 130


def nth(iterable, n, default=None):
"Returns the nth item or a default value"
# pylint: disable=invalid-name
return next(islice(iterable, n, None), default)

def moment1_dist(docs, n_partitions=1):
''' Compute M1 in distributed mode
Parameters
-----------
docs : str
Path for the entire collection of word count vectors.
n_partitions : int
Number of partitions over the document collection, >= 1.
Returns
----------
out : length-vocab_size array
M1 of the entire document collection
'''
n_docs, vocab_size = docs.shape
assert n_docs >= 1 and vocab_size >= 1
assert n_partitions >= 1 and n_partitions <= n_docs

kvstore = mx.kvstore.create('local')
m1_mx = mx.nd.zeros((vocab_size,))
kvstore.init(KEY_M1, m1_mx)

start, end = nth(equal_partitions(n_docs, n_partitions), kvstore.rank)
curr_partition = pload(docs, start, end)
contrib = contrib_m1(curr_partition, n_docs)
kvstore.push(KEY_M1, mx.nd.array(contrib))

kvstore.pull(KEY_M1, out=m1_mx)
kvstore.close()
return m1_mx.asnumpy()

def prod_m2_x_dist(docs, test_x, alpha0, docs_m1=None, n_partitions=1):
''' Compute the product of M2 by test matrix X in distributed mode
Parameters
-----------
docs : str
Path for the entire collection of word count vectors.
test_x : vocab_size-by-k array
Test matrix where k is the number of factors.
alpha0 : float
Sum of the Dirichlet prior parameter.
docs_m1: length-vocab_size array, optional
M1 of the entire collection of word count vectors.
n_partitions : int, optional
Number of partitions, 1 by default.
Returns
-----------
out : vocab_size-by-k array
Product of M2 by X.
'''
def adjust(prod_e2x, docs_m1, test_x, alpha0):
''' Adjust for the final result '''
adj = alpha0 / (alpha0 + 1) * np.outer(docs_m1, docs_m1.dot(test_x))
return prod_e2x - adj

n_docs, vocab_size = docs.shape
_vocab_size, num_factors = test_x.shape
assert n_docs >= 1 and vocab_size >= 1
assert n_partitions >= 1 and n_partitions <= n_docs
assert vocab_size == _vocab_size and num_factors >= 1
if docs_m1 is not None:
assert docs_m1.ndim == 1 and vocab_size == len(docs_m1)
assert alpha0 > 0

# Compute M1 if not provided
if docs_m1 is None:
docs_m1 = moment1_dist(docs, n_partitions)

# Init KVStore
kvstore = mx.kvstore.create('local')
prod_e2x_mx = mx.nd.zeros((vocab_size, num_factors))
kvstore.init(KEY_PROD_M2X, prod_e2x_mx)

# Push current contribution to product of E2 and X
start, end = nth(equal_partitions(n_docs, n_partitions), kvstore.rank)
curr_partition = pload(docs, start, end)
contrib = contrib_prod_e2_x(curr_partition, test_x, n_docs)
kvstore.push(KEY_PROD_M2X, mx.nd.array(contrib))

# Reduce and pull the product of E2 and X
kvstore.pull(KEY_PROD_M2X, out=prod_e2x_mx)
kvstore.close()

return adjust(prod_e2x_mx.asnumpy(), docs_m1, test_x, alpha0)

def whiten_m3_dist(docs, whn, alpha0, docs_m1=None, n_partitions=1):
''' Whiten M3 in distributed mode
Parameters
-----------
docs : str
Path for the entire collection of word count vectors.
whn : vocab_size-by-k array
Whitening matrix.
alpha0 : float
Sum of Dirichlet prior parameter.
docs_m1 : length-vocab_size array, optional
M1 of the entire collection of word count vectors.
n_partitions : int, optional
Number of partitions, 1 by default.
Returns
----------
out : k-by-(k ** 2) array
Whitened M3, unfolded version.
'''
def adjust(whitened_e3, whitened_e2m1, docs_m1, whn, alpha0):
''' Adjust for the final result '''
_, num_factors = whn.shape
# length-k
whitened_m1 = docs_m1.dot(whn)
whitened_m1_3 = (np.einsum('i,j,k->ijk', whitened_m1, whitened_m1,
whitened_m1).reshape((num_factors, -1)))

coeff1 = alpha0 / (alpha0 + 2)
coeff2 = 2 * alpha0 ** 2 / (alpha0 + 1) / (alpha0 + 2)
return (whitened_e3 - coeff1 * whitened_e2m1
+ coeff2 * whitened_m1_3)

n_docs, vocab_size = docs.shape
_vocab_size, num_factors = whn.shape
assert n_docs >= 1 and vocab_size >= 1
assert n_partitions <= n_docs
assert vocab_size == _vocab_size and num_factors >= 1
if docs_m1 is not None:
assert docs_m1.ndim == 1 and vocab_size == len(docs_m1)
assert alpha0 > 0

# Compute M1 if not provided
if docs_m1 is None:
docs_m1 = moment1_dist(docs, n_partitions)

# Init KVStore
kvstore = mx.kvstore.create('local')
whitened_e3_mx = mx.nd.zeros((num_factors, num_factors ** 2))
whitened_e2m1_mx = mx.nd.zeros((num_factors, num_factors ** 2))
kvstore.init(KEY_WHITENED_E3, whitened_e3_mx)
kvstore.init(KEY_WHITENED_E2M1, whitened_e2m1_mx)

# Push current contribution to product of E2 and X
start, end = nth(equal_partitions(n_docs, n_partitions), kvstore.rank)
curr_partition = pload(docs, start, end)
contrib_e3 = contrib_whiten_e3(curr_partition, whn, n_docs)
contrib_e2m1 = contrib_whiten_e2m1(curr_partition, docs_m1,
whn, n_docs)
kvstore.push(KEY_WHITENED_E3, mx.nd.array(contrib_e3))
kvstore.push(KEY_WHITENED_E2M1, mx.nd.array(contrib_e2m1))

# Reduce and pull the product of E2 and X
kvstore.pull(KEY_WHITENED_E3, out=whitened_e3_mx)
kvstore.pull(KEY_WHITENED_E2M1, out=whitened_e2m1_mx)
kvstore.close()

return adjust(whitened_e3_mx.asnumpy(), whitened_e2m1_mx.asnumpy(),
docs_m1, whn, alpha0)
166 changes: 166 additions & 0 deletions partitioned_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
''' Routines for accessing partitioned array
For a data set we could store the meta information and data partitions
under a directory. The list of files are:
.meta
p000000.npy
p000001.npy
p000002.npy
...
The layout of `.meta` is
# height width partition_size
<height> <width> <partition_size>
which gives the number of rows (`height`), columns (`width`) of the entire
data set, the number of rows of each parition (`partition_size`).
`p000000.npy`, `p000001.npy`, `p000002.npy`, etc. store the partitions of
the data, each with `partition_size` rows. Each partition could be a NumPy
array or sparse csr_matrix.
'''
from pathlib import Path
import subprocess
import numpy as np
import scipy.sparse as sps

def pmeta(fname):
''' Return meta data of the partitioned array
PARAMETERS
-----------
fname : str
Path to the partitioned array.
RETURNS
-----------
height : int
Number of rows of the entire data set.
width : int
Numer of columns of the entire data set.
partition_size : int
Number of rows in each partition.
'''
height, width, partition_size = np.loadtxt(fname + '/.meta', dtype=int)
assert height >= 1 and width >= 1
assert partition_size >= 1
return height, width, partition_size

def _vstack(partitions):
''' vstack array or sparse matrix '''
# pylint: disable=no-else-return
if sps.issparse(partitions[0]):
return sps.vstack(partitions)
else:
return np.vstack(partitions)

def pload(fname, start, end):
''' Load specified range of the partitioned array
PARAMETERS
-----------
fname : str
Path to the partitioned array.
start : int
Index of the starting row, inclusive.
end : int
Index of the ending row, exclusive.
RETURNS
-----------
out : array or csr_matrix
Specified range of the partitioned array.
'''

height, _, partition_size = pmeta(fname)
assert start >= 0 and end >= start
assert end <= height

# Compute the start and end partition IDs
start_partition_id = start // partition_size
end_partition_id = (end + partition_size - 1) // partition_size
assert end_partition_id <= 1e6

# Retrieve all the partitions
path = Path(fname)
partitions = []
for partition_id in range(start_partition_id, end_partition_id):
partition = np.load(path / 'p{:06d}.npy'.format(partition_id))
# Sparse csr_matrix will be read inside a size-1 ndarray
if partition.dtype == 'object':
partition = partition.item()

partitions.append(partition)

# Concatenate into a superset of the requested data
superset = _vstack(partitions)

# Return requested data
offset = start_partition_id * partition_size
return superset[(start - offset):(end - offset)]

def psave(fname, arr, shape, partition_size, force=False):
''' Save partitioned array
PARAMETERS
-----------
fname : str
Path under which to save the partitioned array.
arr : iterable
Iterable of partitions of the array, each partition
could be NumPy array or sparse csr_matrix.
shape : tuple
Shape of the entire array.
partition_size : int
Size of each partition.
force : bool, optional
False by default, for which no writing is performed if
the path is non-empty. Setting to True will force writing.
'''
path = Path(fname)

# Check if path is non-empty
# only quit if force=False
if not force and (path / '.meta').exists():
raise RuntimeError('%s non-empty', path)

# Make path
subprocess.run(['rm -rf {}'.format(path)], shell=True)
path.mkdir()

# Write .meta
with (path / '.meta').open(mode='w') as fmeta:
fmeta.write('# height width partition_size\n')
fmeta.write('{} {} {}'.format(shape[0], shape[1],
partition_size))

# Every time we read in at least partition_size rows,
# we write them into partition files
count_rows = 0
list_rows = []
partition_id = 0
for rows in arr:
list_rows.append(rows)
count_rows += rows.shape[0]

if count_rows >= partition_size:
cache_arr = _vstack(list_rows)
for i in range(count_rows // partition_size):
fpart = path / 'p{:06d}.npy'.format(partition_id + i)
np.save(fpart, cache_arr[(i * partition_size):
((i + 1) * partition_size)])

partition_id += count_rows // partition_size
remaining_rows = count_rows % partition_size
list_rows.clear()
if remaining_rows > 0:
list_rows.append(cache_arr[-remaining_rows:, :])
count_rows = remaining_rows

# Write the last partition if there is
if count_rows > 0:
assert count_rows < partition_size
fpart = path / 'p{:06d}.npy'.format(partition_id)
np.save(fpart, _vstack(list_rows))
Loading

0 comments on commit edecb43

Please sign in to comment.