Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change array creation signature to allow sharding specification [do not merge] #2169

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,15 @@
import numpy as np
import numpy.typing as npt

from zarr.core.array import Array, AsyncArray
from zarr.core.common import JSON, AccessModeLiteral, ChunkCoords, MemoryOrder, ZarrFormat
from zarr.core.array import Array, AsyncArray, ChunkSpec
from zarr.core.common import (
JSON,
AccessModeLiteral,
ChunkCoords,
MemoryOrder,
ShapeLike,
ZarrFormat,
)
from zarr.core.group import AsyncGroup
from zarr.core.metadata import ArrayV2Metadata, ArrayV3Metadata
from zarr.store import (
Expand Down Expand Up @@ -561,7 +568,7 @@ async def open_group(
async def create(
shape: ChunkCoords,
*, # Note: this is a change from v2
chunks: ChunkCoords | None = None, # TODO: v2 allowed chunks=True
chunks: ChunkSpec | ShapeLike | None = None, # TODO: v2 allowed chunks=True
dtype: npt.DTypeLike | None = None,
compressor: dict[str, JSON] | None = None, # TODO: default and type change
fill_value: Any = 0, # TODO: need type
Expand All @@ -583,7 +590,6 @@ async def create(
meta_array: Any | None = None, # TODO: need type
attributes: dict[str, JSON] | None = None,
# v3 only
chunk_shape: ChunkCoords | None = None,
chunk_key_encoding: (
ChunkKeyEncoding
| tuple[Literal["default"], Literal[".", "/"]]
Expand Down Expand Up @@ -674,15 +680,6 @@ async def create(
or _default_zarr_version()
)

if zarr_format == 2 and chunks is None:
chunks = shape
if zarr_format == 3 and chunk_shape is None:
if chunks is not None:
chunk_shape = chunks
chunks = None
else:
chunk_shape = shape

if order is not None:
warnings.warn(
"order is deprecated, use config `array.order` instead",
Expand Down Expand Up @@ -729,7 +726,6 @@ async def create(
filters=filters,
dimension_separator=dimension_separator,
zarr_format=zarr_format,
chunk_shape=chunk_shape,
chunk_key_encoding=chunk_key_encoding,
codecs=codecs,
dimension_names=dimension_names,
Expand Down
91 changes: 64 additions & 27 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import numpy.typing as npt

from zarr.abc.store import set_or_delete
from zarr.codecs import BytesCodec
from zarr.codecs import BytesCodec, ShardingCodec
from zarr.codecs._v2 import V2Compressor, V2Filters
from zarr.core.attributes import Attributes
from zarr.core.buffer import BufferPrototype, NDArrayLike, NDBuffer, default_buffer_prototype
Expand Down Expand Up @@ -71,6 +71,13 @@
# Array and AsyncArray are defined in the base ``zarr`` namespace
__all__ = ["parse_array_metadata", "create_codec_pipeline"]

from typing import TypedDict


class ChunkSpec(TypedDict, total=False):
read_shape: tuple[int, ...]
write_shape: tuple[int, ...]


def parse_array_metadata(data: Any) -> ArrayV2Metadata | ArrayV3Metadata:
if isinstance(data, ArrayV2Metadata | ArrayV3Metadata):
Expand Down Expand Up @@ -126,8 +133,6 @@ async def create(
zarr_format: ZarrFormat = 3,
fill_value: Any | None = None,
attributes: dict[str, JSON] | None = None,
# v3 only
chunk_shape: ChunkCoords | None = None,
chunk_key_encoding: (
ChunkKeyEncoding
| tuple[Literal["default"], Literal[".", "/"]]
Expand All @@ -136,8 +141,8 @@ async def create(
) = None,
codecs: Iterable[Codec | dict[str, JSON]] | None = None,
dimension_names: Iterable[str] | None = None,
chunks: ChunkSpec | ShapeLike | None = None,
# v2 only
chunks: ShapeLike | None = None,
dimension_separator: Literal[".", "/"] | None = None,
order: Literal["C", "F"] | None = None,
filters: list[dict[str, JSON]] | None = None,
Expand All @@ -150,16 +155,6 @@ async def create(

shape = parse_shapelike(shape)

if chunk_shape is None:
if chunks is None:
chunk_shape = chunks = _guess_chunks(shape=shape, typesize=np.dtype(dtype).itemsize)
else:
chunks = parse_shapelike(chunks)

chunk_shape = chunks
elif chunks is not None:
raise ValueError("Only one of chunk_shape or chunks must be provided.")

if zarr_format == 3:
if dimension_separator is not None:
raise ValueError(
Expand All @@ -181,7 +176,7 @@ async def create(
store_path,
shape=shape,
dtype=dtype,
chunk_shape=chunk_shape,
chunks=chunks,
fill_value=fill_value,
chunk_key_encoding=chunk_key_encoding,
codecs=codecs,
Expand All @@ -204,7 +199,7 @@ async def create(
store_path,
shape=shape,
dtype=dtype,
chunks=chunk_shape,
chunks=chunks,
dimension_separator=dimension_separator,
fill_value=fill_value,
order=order,
Expand All @@ -214,7 +209,7 @@ async def create(
exists_ok=exists_ok,
)
else:
raise ValueError(f"Insupported zarr_format. Got: {zarr_format}")
raise ValueError(f"Unsupported zarr_format. Got: {zarr_format}")

if data is not None:
# insert user-provided data
Expand All @@ -229,7 +224,7 @@ async def _create_v3(
*,
shape: ShapeLike,
dtype: npt.DTypeLike,
chunk_shape: ChunkCoords,
chunks: ShapeLike | ChunkSpec | None = None,
fill_value: Any | None = None,
chunk_key_encoding: (
ChunkKeyEncoding
Expand All @@ -245,8 +240,34 @@ async def _create_v3(
if not exists_ok:
await ensure_no_existing_node(store_path, zarr_format=3)

shape = parse_shapelike(shape)
codecs = list(codecs) if codecs is not None else [BytesCodec()]
array_shape = parse_shapelike(shape)
shard_shape: tuple[int, ...] | None = None
chunk_shape: tuple[int, ...]

# because chunks is an optional typeddict with optional keys, it could be completely empty
# OR None, both of which result in chunks being inferred automatically
if chunks is not None and not (chunks == {}):
if isinstance(chunks, dict):
if "write_shape" in chunks:
chunk_shape = chunks["write_shape"]
if "read_shape" in chunks:
# sharding is only enabled when read_shape and write_shape are specified
# we do not special-case the condition when read_shape and write_shape are the same
shard_shape = chunks["read_shape"]
elif "read_shape" in chunks:
# if read_shape is present, but write_shape is absent, then
# set the chunk_shape to read_shape, and keep shard_shape set to `None`
chunk_shape = chunks["read_shape"]
else:
chunk_shape = parse_shapelike(chunks)
else:
# determine chunking parameters automatically
chunk_shape = _guess_chunks(array_shape, np.dtype(dtype).itemsize)

_codecs = tuple(codecs) if codecs is not None else (BytesCodec(),)

if shard_shape is not None:
_codecs = (ShardingCodec(chunk_shape=shard_shape, codecs=_codecs),)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This effectively hard-codes sharding into the spec, something like a sharded=True flag that might have existed on the CHunkSpec object. How do you expect this to extend to variable chunking or other schemes that might be created in the future?

Copy link
Contributor Author

@d-v-b d-v-b Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This proposal can use whatever specification for variable length chunks we come up with, e.g. tuples of tuples of ints. You could specify variable length chunking with no sharding via something like chunks = {'write_shape': ((10,5), (1,2,3)}, and variable length chunking with sharding via something like chunks = {'write_shape: ((10,5), (1,2,3)), 'read_shape': (1,1)}. The read shape would have to checked for consistency with all the unique chunk shapes in this case. We would of course need to widen the type of ChunkSpec for this to accept tuple[tuple[int, ...]] for the write_shape keys.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like a sharded=True flag that might have existed on the CHunkSpec object.

If we had such a flag on the chunkspec object, then it would semantically collide with read_shape: {'write_shape': (10,10), 'read_shape': (2,2), sharding: False} would not be valid, because there's no way to have read_shape and write_shape differ without sharding. BTW when I say "sharding" i don't mean "the sharding codec", I mean the general concept of packing multiple subchunks into a single file. If a non-codec implementation of sharding emerges, then I would like to imagine that this API could wrap that.


if fill_value is None:
if dtype == np.dtype("bool"):
Expand All @@ -266,12 +287,12 @@ async def _create_v3(
)

metadata = ArrayV3Metadata(
shape=shape,
shape=array_shape,
data_type=dtype,
chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape),
chunk_key_encoding=chunk_key_encoding,
fill_value=fill_value,
codecs=codecs,
codecs=_codecs,
dimension_names=tuple(dimension_names) if dimension_names else None,
attributes=attributes or {},
)
Expand All @@ -288,7 +309,7 @@ async def _create_v2(
*,
shape: ChunkCoords,
dtype: npt.DTypeLike,
chunks: ChunkCoords,
chunks: ChunkSpec | ShapeLike | None,
dimension_separator: Literal[".", "/"] | None = None,
fill_value: None | int | float = None,
order: Literal["C", "F"] | None = None,
Expand All @@ -307,10 +328,28 @@ async def _create_v2(
if dimension_separator is None:
dimension_separator = "."

if chunks is None or chunks == {}:
_chunks = _guess_chunks(shape, np.dtype(dtype).itemsize)
elif isinstance(chunks, dict):
if "write_shape" in chunks:
_chunks = parse_shapelike(chunks["write_shape"])
if "read_shape" in chunks:
if chunks["read_shape"] != chunks["write_shape"]:
msg = "Invalid chunk specification. For zarr v2, read_shape must match write_shape."
raise ValueError(msg)
elif "read_shape" in chunks:
_chunks = parse_shapelike(chunks["read_shape"])
else:
raise ValueError(
f"Invalid chunk specification: {chunks}. Expected a dict compatible with ChunkSpec"
)
else:
_chunks = parse_shapelike(chunks)

metadata = ArrayV2Metadata(
shape=shape,
dtype=np.dtype(dtype),
chunks=chunks,
chunks=_chunks,
order=order,
dimension_separator=dimension_separator,
fill_value=0 if fill_value is None else fill_value,
Expand Down Expand Up @@ -638,7 +677,6 @@ def create(
fill_value: Any | None = None,
attributes: dict[str, JSON] | None = None,
# v3 only
chunk_shape: ChunkCoords | None = None,
chunk_key_encoding: (
ChunkKeyEncoding
| tuple[Literal["default"], Literal[".", "/"]]
Expand All @@ -648,7 +686,7 @@ def create(
codecs: Iterable[Codec | dict[str, JSON]] | None = None,
dimension_names: Iterable[str] | None = None,
# v2 only
chunks: ChunkCoords | None = None,
chunks: ChunkSpec | ShapeLike | None = None,
dimension_separator: Literal[".", "/"] | None = None,
order: Literal["C", "F"] | None = None,
filters: list[dict[str, JSON]] | None = None,
Expand All @@ -664,7 +702,6 @@ def create(
zarr_format=zarr_format,
attributes=attributes,
fill_value=fill_value,
chunk_shape=chunk_shape,
chunk_key_encoding=chunk_key_encoding,
codecs=codecs,
dimension_names=dimension_names,
Expand Down
14 changes: 3 additions & 11 deletions src/zarr/core/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from zarr.abc.metadata import Metadata
from zarr.abc.store import set_or_delete
from zarr.core.array import Array, AsyncArray
from zarr.core.array import Array, AsyncArray, ChunkSpec
from zarr.core.attributes import Attributes
from zarr.core.buffer import default_buffer_prototype
from zarr.core.common import (
Expand Down Expand Up @@ -371,7 +371,6 @@ async def create_array(
fill_value: Any | None = None,
attributes: dict[str, JSON] | None = None,
# v3 only
chunk_shape: ChunkCoords | None = None,
chunk_key_encoding: (
ChunkKeyEncoding
| tuple[Literal["default"], Literal[".", "/"]]
Expand All @@ -381,7 +380,7 @@ async def create_array(
codecs: Iterable[Codec | dict[str, JSON]] | None = None,
dimension_names: Iterable[str] | None = None,
# v2 only
chunks: ShapeLike | None = None,
chunks: ChunkSpec | ShapeLike | None = None,
dimension_separator: Literal[".", "/"] | None = None,
order: Literal["C", "F"] | None = None,
filters: list[dict[str, JSON]] | None = None,
Expand Down Expand Up @@ -435,7 +434,6 @@ async def create_array(
self.store_path / name,
shape=shape,
dtype=dtype,
chunk_shape=chunk_shape,
fill_value=fill_value,
chunk_key_encoding=chunk_key_encoding,
codecs=codecs,
Expand Down Expand Up @@ -896,7 +894,6 @@ def create_array(
fill_value: Any | None = None,
attributes: dict[str, JSON] | None = None,
# v3 only
chunk_shape: ChunkCoords | None = None,
chunk_key_encoding: (
ChunkKeyEncoding
| tuple[Literal["default"], Literal[".", "/"]]
Expand All @@ -905,8 +902,7 @@ def create_array(
) = None,
codecs: Iterable[Codec | dict[str, JSON]] | None = None,
dimension_names: Iterable[str] | None = None,
# v2 only
chunks: ShapeLike | None = None,
chunks: ChunkSpec | ShapeLike | None = None,
dimension_separator: Literal[".", "/"] | None = None,
order: Literal["C", "F"] | None = None,
filters: list[dict[str, JSON]] | None = None,
Expand Down Expand Up @@ -966,7 +962,6 @@ def create_array(
dtype=dtype,
fill_value=fill_value,
attributes=attributes,
chunk_shape=chunk_shape,
chunk_key_encoding=chunk_key_encoding,
codecs=codecs,
dimension_names=dimension_names,
Expand Down Expand Up @@ -1094,8 +1089,6 @@ def array(
dtype: npt.DTypeLike = "float64",
fill_value: Any | None = None,
attributes: dict[str, JSON] | None = None,
# v3 only
chunk_shape: ChunkCoords | None = None,
chunk_key_encoding: (
ChunkKeyEncoding
| tuple[Literal["default"], Literal[".", "/"]]
Expand Down Expand Up @@ -1166,7 +1159,6 @@ def array(
dtype=dtype,
fill_value=fill_value,
attributes=attributes,
chunk_shape=chunk_shape,
chunk_key_encoding=chunk_key_encoding,
codecs=codecs,
dimension_names=dimension_names,
Expand Down
2 changes: 1 addition & 1 deletion tests/v3/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def test_create_array(memory_store: Store) -> None:
assert z.shape == (100,)

# create array, overwrite, specify chunk shape
z = create(shape=200, chunk_shape=20, store=store, overwrite=True)
z = create(shape=200, chunks=20, store=store, overwrite=True)
assert isinstance(z, Array)
assert z.shape == (200,)
assert z.chunks == (20,)
Expand Down
Loading