Skip to content

Commit

Permalink
Added utilities to iterate over dimensions or blocks.
Browse files Browse the repository at this point in the history
These are analogous to the R package's blockApply with the corresponding
row/colAutoGrid functions to iterate over a single dimension.
  • Loading branch information
LTLA committed Jan 21, 2024
1 parent 0b5fdb5 commit 19d821f
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 80 deletions.
3 changes: 2 additions & 1 deletion src/delayedarray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@
from .is_sparse import is_sparse
from .chunk_shape import chunk_shape
from .is_pristine import is_pristine
from .guess_iteration_block_size import guess_iteration_block_size
from .apply_over_dimension import apply_over_dimension, choose_block_size_for_1d_iteration, guess_iteration_block_size
from .apply_over_blocks import apply_over_blocks, choose_block_shape_for_iteration
from .wrap import wrap
118 changes: 118 additions & 0 deletions src/delayedarray/apply_over_blocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from typing import Callable, Optional, Tuple
import math

from .chunk_shape import chunk_shape
from .is_sparse import is_sparse
from .extract_dense_array import extract_dense_array
from .extract_sparse_array import extract_sparse_array

__author__ = "ltla"
__copyright__ = "ltla"
__license__ = "MIT"


def choose_block_shape_for_iteration(x, memory: int = 10000000) -> Tuple[int, ...]:
"""
Choose the block dimensions for blockwise iteration through an array, see
`~apply_over_blocks` for details.
Args:
x: An array-like object.
dimension: Dimension to iterate over.
memory: Available memory in bytes, to hold a single block in memory.
Returns:
Dimensions of the blocks.
"""
num_elements = memory / x.dtype.itemsize
chunk_dims = chunk_shape(x)
block_size = 1
for s in chunk_dims:
block_size *= s

block_dims = list(chunk_dims)

if num_elements > block_size:
# Going from the first dimension and increasing the block size.
for i, y in enumerate(block_dims):
multiple = int(num_elements / block_size)
if multiple <= 1:
break
block_size /= y
block_dims[i] = min(multiple * y, x.shape[i])
block_size *= block_dims[i]

elif num_elements < block_size:
# Going from the last dimension and decreasing the block size.
for i in range(len(block_dims) - 1, -1, -1):
block_size_other = block_size / block_dims[i]
multiple = int(num_elements / block_size_other)
if multiple >= 1:
block_dims[i] = multiple
break
block_size = block_size_other
block_dims[i] = 1


return (*block_dims,)


def apply_over_blocks(x, fun: Callable, block_shape: Optional[Tuple] = None, allow_sparse: bool = False) -> list:
"""
Iterate over an array by blocks. We apply a user-provided function and
collect the results before proceeding to the next block.
Args:
x: An array-like object.
fun:
Function to apply to each block. This should accept two arguments;
the first is a list containing the start/end of the current block
on each dimension, and the second is the block contents. Each
block is typically provided as a :py:class:`~numpy.ndarray`.
block_shape:
Dimensionsof the block on the iteration dimension. If None, this is
chosen by :py:func:`~choose_block_shape_for_iteration`.
allow_sparse:
Whether to allow extraction of sparse subarrays. If true and
``x`` contains a sparse array, the block contents are instead
represented by a :py:class:`~SparseNdarray.SparseNdarray`.
Returns:
List containing the output of ``fun`` on each block.
"""
if block_shape is None:
block_shape = choose_block_shape_for_iteration(x, dimension)

num_tasks_total = 1
num_tasks_by_dim = []
for i, d in enumerate(x.shape):
curtasks = math.ceil(d / block_shape[i])
num_tasks_by_dim.append(curtasks)
num_tasks_total *= curtasks

if allow_sparse and is_sparse(x):
extractor = extract_sparse_array
else:
extractor = extract_dense_array

collected = []
for job in range(num_tasks_total):
subsets = []
position = []
counter = job
for i, d in enumerate(num_tasks_by_dim):
curtask = counter % d
start = curtask * block_shape[i]
end = min(start + block_shape[i], x.shape[i])
position.append((start, end))
subsets.append(range(start, end))
counter //= d
output = fun(position, extractor(x, (*subsets,)))
collected.append(output)

return collected
103 changes: 103 additions & 0 deletions src/delayedarray/apply_over_dimension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from typing import Callable, Optional
import math

from .chunk_shape import chunk_shape
from .is_sparse import is_sparse
from .extract_dense_array import extract_dense_array
from .extract_sparse_array import extract_sparse_array

__author__ = "ltla"
__copyright__ = "ltla"
__license__ = "MIT"

def guess_iteration_block_size(x, dimension, memory: int = 10000000) -> int:
"""
Soft-deprecated alias for :py:func:`~choose_block_size_for_1d_iteration`.
"""
return choose_block_size_for_1d_iteration(x, dimension, memory)

def choose_block_size_for_1d_iteration(x, dimension: int, memory: int = 10000000) -> int:
"""
Choose a block size for iterating over an array on a certain dimension,
see `~apply_over_dimension` for more details.
Args:
x: An array-like object.
dimension: Dimension to iterate over.
memory: Available memory in bytes, to hold a single block in memory.
Returns:
Size of the block on the iteration dimension.
"""
num_elements = memory / x.dtype.itemsize
shape = x.shape

prod_other = 1
for i, s in enumerate(shape):
if i != dimension:
prod_other *= s

ideal = int(num_elements / prod_other)
if ideal == 0:
return 1

curdim = chunk_shape(x)[dimension]
if ideal <= curdim:
return ideal

return int(ideal / curdim) * curdim


def apply_over_dimension(x, dimension: int, fun: Callable, block_size: Optional[int] = None, allow_sparse: bool = False) -> list:
"""
Iterate over an array on a certain dimension. At each iteration, the block
of observations consists of the full extent of all dimensions other than
the one being iterated over. We apply a user-provided function and collect
the results before proceeding to the next block.
Args:
x: An array-like object.
dimension: Dimension to iterate over.
fun:
Function to apply to each block. This should accept two arguments;
the first is a tuple containing the start/end of the current block
on the chosen ``dimension``, and the second is the block contents.
Each block is typically provided as a :py:class:`~numpy.ndarray`.
block_size:
Size of the block on the iteration dimension. If None, this is
chosen by :py:func:`~choose_block_size_for_1d_iteration`.
allow_sparse:
Whether to allow extraction of sparse subarrays. If true and
``x`` contains a sparse array, the block contents are instead
represented by a :py:class:`~SparseNdarray.SparseNdarray`.
Returns:
List containing the output of ``fun`` on each block.
"""
if block_size is None:
block_size = choose_block_size_for_1d_iteration(x, dimension)

limit = x.shape[dimension]
tasks = math.ceil(limit / block_size)
components = [range(n) for n in x.shape]
if allow_sparse and is_sparse(x):
extractor = extract_sparse_array
else:
extractor = extract_dense_array

collected = []
for job in range(tasks):
start = job * block_size
end = min(start + block_size, limit)
components[dimension] = range(start, end)
subset = (*components,)
output = fun((start, end), extractor(x, subset))
collected.append(output)

return collected
41 changes: 0 additions & 41 deletions src/delayedarray/guess_iteration_block_size.py

This file was deleted.

84 changes: 84 additions & 0 deletions tests/test_apply_over_blocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import numpy as np
import delayedarray as da
import math
import scipy


class _ChunkyBoi:
def __init__(self, shape, chunks):
self._shape = shape
self._chunks = chunks

@property
def dtype(self):
return np.dtype("float64")

@property
def shape(self):
return self._shape


@da.chunk_shape.register
def chunk_shape_ChunkyBoi(x: _ChunkyBoi):
return x._chunks


def test_choose_block_shape_for_iteration():
x = np.random.rand(100, 10)
assert da.choose_block_shape_for_iteration(x, memory=200) == (2, 10)
assert da.choose_block_shape_for_iteration(x, memory=800) == (10, 10)

# Not enough memory.
assert da.choose_block_shape_for_iteration(x, memory=0) == (1, 1)
assert da.choose_block_shape_for_iteration(x, memory=40) == (1, 5)

x = _ChunkyBoi((100, 200), (20, 25))
assert da.choose_block_shape_for_iteration(x, memory=4000) == (20, 25)
assert da.choose_block_shape_for_iteration(x, memory=40000) == (100, 50)
assert da.choose_block_shape_for_iteration(x, memory=80000) == (100, 100)


def test_apply_over_dimension_dense():
x = np.ndarray([100, 200])
counter = 0
for i in range(x.shape[0]):
for j in range(x.shape[1]):
x[i,j] = counter
counter += 1

def dense_sum(position, block):
return position, block.sum()

output = da.apply_over_blocks(x, dense_sum, block_shape=(3, 7))
assert len(output) == math.ceil(x.shape[0]/3) * math.ceil(x.shape[1]/7)
assert x.sum() == sum(y[1] for y in output)
assert output[0][0] == [(0, 3), (0, 7)]
assert output[-1][0] == [(99, 100), (196, 200)]


def test_apply_over_dimension_sparse():
x = scipy.sparse.random(100, 200, 0.2).tocsc()

def dense_sum(position, block):
return position, block.sum()

output = da.apply_over_blocks(x, dense_sum, block_shape=(3, 7))
assert len(output) == math.ceil(x.shape[0]/3) * math.ceil(x.shape[1]/7)
assert np.allclose(x.sum(), sum(y[1] for y in output))
assert output[0][0] == [(0, 3), (0, 7)]
assert output[-1][0] == [(99, 100), (196, 200)]

# Now activating sparse mode.
def sparse_sum(position, block):
assert isinstance(block, da.SparseNdarray)
total = 0
for v in block.contents:
if v is not None:
total += v[1].sum()
return position, total

output = da.apply_over_blocks(x, sparse_sum, block_shape=(3, 7), allow_sparse=True)
assert len(output) == math.ceil(x.shape[0]/3) * math.ceil(x.shape[1]/7)
assert np.allclose(x.sum(), sum(y[1] for y in output))
assert output[0][0] == [(0, 3), (0, 7)]
assert output[-1][0] == [(99, 100), (196, 200)]
Loading

0 comments on commit 19d821f

Please sign in to comment.