Skip to content

Commit

Permalink
Allow more interactions between numpy and lazyexpr engine
Browse files Browse the repository at this point in the history
  • Loading branch information
FrancescAlted committed Nov 2, 2024
1 parent 3d43aed commit 78cf62a
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 15 deletions.
8 changes: 8 additions & 0 deletions src/blosc2/helpers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
#######################################################################
# Copyright (c) 2019-present, Blosc Development Team <[email protected]>
# All rights reserved.
#
# This source code is licensed under a BSD-style license (found in the
# LICENSE file in the root directory of this source tree)
#######################################################################

import re


Expand Down
49 changes: 43 additions & 6 deletions src/blosc2/lazyexpr.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import blosc2
from blosc2 import compute_chunks_blocks
from blosc2.info import InfoReporter
from blosc2.ndarray import get_chunks_idx
from blosc2.ndarray import get_chunks_idx, _check_allowed_dtypes


def is_inside_eval():
Expand Down Expand Up @@ -1125,7 +1125,8 @@ def reduce_slices(
dtype = result.dtype
if is_inside_eval():
# We already have the dtype and reduced_shape, so return immediately
return np.zeros(reduced_shape, dtype=dtype)
# Use a blosc2 container, as it consumes less memory in general
return blosc2.zeros(reduced_shape, dtype=dtype)
out = convert_none_out(dtype, reduce_op, reduced_shape)

# Update the output array with the result
Expand Down Expand Up @@ -1482,7 +1483,7 @@ def update_expr(self, new_op):
try:
op_name = list(value2.operands.keys())[list(value2.operands.values()).index(value1)]
except ValueError:
op_name = f"o{len(self.operands)}"
op_name = f"o{len(value2.operands)}"
new_operands = {op_name: value1}
if op == "[]": # syntactic sugar for slicing
expression = f"({op_name}[{self.expression}])"
Expand Down Expand Up @@ -1535,6 +1536,39 @@ def blocks(self):
self._chunks, self._blocks = compute_chunks_blocks(self.shape, None, None, dtype=self.dtype)
return self._blocks

def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
# Handle operations at the array level
if method != "__call__":
return NotImplemented

ufunc_map = {
np.add: "+",
np.subtract: "-",
np.multiply: "*",
np.divide: "/",
np.true_divide: "/",
np.power: "**",
np.less: "<",
np.less_equal: "<=",
np.greater: ">",
np.greater_equal: ">=",
np.equal: "==",
np.not_equal: "!=",
np.sqrt: "sqrt",
}

if ufunc in ufunc_map:
if ufunc == np.sqrt:
# Special case for sqrt
value = inputs[0]
_check_allowed_dtypes(value)
return blosc2.LazyExpr(new_op=(value, "sqrt", None))
value = inputs[0] if inputs[1] is self else inputs[1]
_check_allowed_dtypes(value)
return blosc2.LazyExpr(new_op=(value, ufunc_map[ufunc], self))

return NotImplemented

def __neg__(self):
return self.update_expr(new_op=(0, "-", self))

Expand Down Expand Up @@ -1820,9 +1854,12 @@ def save(self, urlpath=None, **kwargs):
raise ValueError(
"To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects"
)
if value.schunk.urlpath is None:
raise ValueError("To save a LazyArray, all operands must be stored on disk/network")
operands[key] = value.schunk.urlpath
if key != "blosc2":
if value.schunk.urlpath is None:
raise ValueError(
"To save a LazyArray, all operands must be stored on disk/network"
)
operands[key] = value.schunk.urlpath
array.schunk.vlmeta["_LazyArray"] = {
"expression": self.expression,
"UDF": None,
Expand Down
16 changes: 11 additions & 5 deletions src/blosc2/ndarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,14 @@ def get_chunks_idx(shape, chunks):


def _check_allowed_dtypes(
value: bool | int | float | str | NDArray | NDField | blosc2.C2Array | blosc2.Proxy,
value: bool | int | float | str | blosc2.NDArray | blosc2.NDField | blosc2.C2Array | blosc2.Proxy,
):
if not (
isinstance(
value,
blosc2.LazyExpr
| NDArray
| NDField
| blosc2.NDArray
| blosc2.NDField
| blosc2.C2Array
| blosc2.Proxy
| blosc2.ProxyNDField
Expand Down Expand Up @@ -574,8 +574,6 @@ def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
# Handle operations at the array level
if method != "__call__":
return NotImplemented
value = inputs[0] if inputs[1] is self else inputs[1]
_check_allowed_dtypes(value)

ufunc_map = {
np.add: "+",
Expand All @@ -590,9 +588,17 @@ def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
np.greater_equal: ">=",
np.equal: "==",
np.not_equal: "!=",
np.sqrt: "sqrt",
}

if ufunc in ufunc_map:
if ufunc == np.sqrt:
# Special case for sqrt
value = inputs[0]
_check_allowed_dtypes(value)
return blosc2.LazyExpr(new_op=(value, "sqrt", None))
value = inputs[0] if inputs[1] is self else inputs[1]
_check_allowed_dtypes(value)
return blosc2.LazyExpr(new_op=(value, ufunc_map[ufunc], self))

return NotImplemented
Expand Down
27 changes: 23 additions & 4 deletions tests/ndarray/test_reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,22 +224,36 @@ def test_save(disk, fill_value, reduce_op, axis):
urlpath = "a1.b2nd" if disk else None
if fill_value != 0:
a = blosc2.full(shape, fill_value, urlpath=urlpath, mode="w")
b = blosc2.full(shape, fill_value - .1, urlpath="b.b2nd", mode="w")
else:
a = blosc2.zeros(shape, dtype=np.float64, urlpath=urlpath, mode="w")
b = blosc2.zeros(shape, dtype=np.float64, urlpath="b.b2nd", mode="w") - .1
if disk:
a = blosc2.open(urlpath)
b = blosc2.open("b.b2nd")
na = a[:]
nb = b[:]

expr = f"a + a.{reduce_op}(axis={axis})"
lexpr = blosc2.lazyexpr(expr, operands={"a": a})
expr = f"a + b.{reduce_op}(axis={axis})"
lexpr = blosc2.lazyexpr(expr, operands={"a": a, "b": b})
if disk:
lexpr.save("out.b2nd")
lexpr = blosc2.open("out.b2nd")
res = lexpr.compute()
nres = na + getattr(na[()], reduce_op)(axis=axis)
nres = na + getattr(nb[()], reduce_op)(axis=axis)
assert np.allclose(res[()], nres)

# A expression with a single operand that is reduced should be supported as well
# Test an expression with a reduction in front
expr = f"a.{reduce_op}(axis={axis}) + b"
lexpr = blosc2.lazyexpr(expr, operands={"a": a, "b": b})
if disk:
lexpr.save("out.b2nd")
lexpr = blosc2.open("out.b2nd")
res = lexpr.compute()
nres = getattr(na[()], reduce_op)(axis=axis) + nb
assert np.allclose(res[()], nres)

# An expression with a single operand that is reduced should be supported as well
expr = f"a.{reduce_op}(axis={axis})"
lexpr = blosc2.lazyexpr(expr, operands={"a": a})
if disk:
Expand All @@ -249,3 +263,8 @@ def test_save(disk, fill_value, reduce_op, axis):
nres = getattr(na[()], reduce_op)(axis=axis)

assert np.allclose(res[()], nres)

if disk:
blosc2.remove_urlpath("a1.b2nd")
blosc2.remove_urlpath("b.b2nd")
blosc2.remove_urlpath("out.b2nd")

0 comments on commit 78cf62a

Please sign in to comment.