Skip to content

Commit

Permalink
Make AsyncArray.nchunks_initialized async
Browse files Browse the repository at this point in the history
This changes the API of AysncArray.nchunks_initialized to
change it from a property to an async function. The motivation
here comes from

1. general cleanliness (a property access calling async functions doing
   I/O feels a bit wrong)
2. Work on Array.info, where I hit a strange error, I think from jumping
   from a

   - sync Array.info_complete ->
   - async AsyncArray.info_complete ->
   - sync AsyncArray.nchunks_initialzed ->
   - sync collect_aiterator (async list_prefix)

   With this change, we'll be able to jump from sync to async just once
   at the boundary.

```
  File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/array.py", line 3011, in info_complete
    return sync(self._async_array.info_complete())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/sync.py", line 141, in sync
    raise return_result
  File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/sync.py", line 100, in _runner
    return await coro
           ^^^^^^^^^^
  File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/array.py", line 1223, in info_complete
    "count_chunks_initialized": self.nchunks_initialized,  # this should be async?
                                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/array.py", line 844, in nchunks_initialized
    return nchunks_initialized(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/array.py", line 3035, in nchunks_initialized
    return len(chunks_initialized(array))
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/array.py", line 3061, in chunks_initialized
    collect_aiterator(array.store_path.store.list_prefix(prefix=array.store_path.path))
  File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/sync.py", line 178, in collect_aiterator
    return sync(_collect_aiterator(data))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/sync.py", line 128, in sync
    raise SyncError("Calling sync() from within a running loop")
zarr.core.sync.SyncError: Calling sync() from within a running loop
```
  • Loading branch information
TomAugspurger committed Oct 30, 2024
1 parent 4c3081c commit e684299
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 46 deletions.
89 changes: 49 additions & 40 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
T_ArrayMetadata,
)
from zarr.core.metadata.v3 import parse_node_type_array
from zarr.core.sync import collect_aiterator, sync
from zarr.core.sync import sync
from zarr.errors import MetadataValidationError
from zarr.registry import get_pipeline_class
from zarr.storage import StoreLike, make_store_path
Expand Down Expand Up @@ -829,17 +829,31 @@ def nchunks(self) -> int:
"""
return product(self.cdata_shape)

@property
def nchunks_initialized(self) -> int:
async def nchunks_initialized(self) -> int:
"""
The number of chunks that have been persisted in storage.
Calculate the number of chunks that have been initialized, i.e. the number of chunks that have
been persisted to the storage backend.
Returns
-------
int
The number of initialized chunks in the array.
nchunks_initialized : int
The number of chunks that have been initialized.
Notes
-----
On :class:`AsyncArray` this is an asynchronous method, unlike the (synchronous)
property :attr:`Array.nchunks_initialized`.
Examples
--------
>>> arr = await zarr.api.asynchronous.create(shape=(10,), chunks=(2,))
>>> await arr.nchunks_initialized()
0
>>> await arr.setitem(slice(5), 1)
>>> await arr.nchunks_initialized()
3
"""
return nchunks_initialized(self)
return len(await chunks_initialized(self))

def _iter_chunk_coords(
self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
Expand Down Expand Up @@ -1492,9 +1506,29 @@ def nbytes(self) -> int:
@property
def nchunks_initialized(self) -> int:
"""
The number of chunks that have been initialized in the stored representation of this array.
Calculate the number of chunks that have been initialized, i.e. the number of chunks that have
been persisted to the storage backend.
Returns
-------
nchunks_initialized : int
The number of chunks that have been initialized.
Notes
-----
On :class:`Array` this is a (synchronous) property, unlike asynchronous function
:meth:`AsyncArray.nchunks_initialized`.
Examples
--------
>>> arr = await zarr.create(shape=(10,), chunks=(2,))
>>> arr.nchunks_initialized
0
>>> arr[:5] = 1
>>> arr.nchunks_initialized
3
"""
return self._async_array.nchunks_initialized
return sync(self._async_array.nchunks_initialized())

def _iter_chunk_keys(
self, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
Expand Down Expand Up @@ -2905,39 +2939,15 @@ def info(self) -> None:
)


def nchunks_initialized(
array: AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata] | Array,
) -> int:
"""
Calculate the number of chunks that have been initialized, i.e. the number of chunks that have
been persisted to the storage backend.
Parameters
----------
array : Array
The array to inspect.
Returns
-------
nchunks_initialized : int
The number of chunks that have been initialized.
See Also
--------
chunks_initialized
"""
return len(chunks_initialized(array))


def chunks_initialized(
array: Array | AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata],
async def chunks_initialized(
array: AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata],
) -> tuple[str, ...]:
"""
Return the keys of the chunks that have been persisted to the storage backend.
Parameters
----------
array : Array
array : AsyncArray
The array to inspect.
Returns
Expand All @@ -2950,10 +2960,9 @@ def chunks_initialized(
nchunks_initialized
"""
# TODO: make this compose with the underlying async iterator
store_contents = list(
collect_aiterator(array.store_path.store.list_prefix(prefix=array.store_path.path))
)
store_contents = [
x async for x in array.store_path.store.list_prefix(prefix=array.store_path.path)
]
return tuple(chunk_key for chunk_key in array._iter_chunk_keys() if chunk_key in store_contents)


Expand Down
12 changes: 6 additions & 6 deletions tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def test_nchunks(test_cls: type[Array] | type[AsyncArray[Any]], nchunks: int) ->


@pytest.mark.parametrize("test_cls", [Array, AsyncArray[Any]])
def test_nchunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> None:
async def test_nchunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> None:
"""
Test that nchunks_initialized accurately returns the number of stored chunks.
"""
Expand All @@ -337,7 +337,7 @@ def test_nchunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> N
if test_cls == Array:
observed = arr.nchunks_initialized
else:
observed = arr._async_array.nchunks_initialized
observed = await arr._async_array.nchunks_initialized()
assert observed == expected

# delete chunks
Expand All @@ -346,13 +346,13 @@ def test_nchunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> N
if test_cls == Array:
observed = arr.nchunks_initialized
else:
observed = arr._async_array.nchunks_initialized
observed = await arr._async_array.nchunks_initialized()
expected = arr.nchunks - idx - 1
assert observed == expected


@pytest.mark.parametrize("test_cls", [Array, AsyncArray[Any]])
def test_chunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> None:
async def test_chunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> None:
"""
Test that chunks_initialized accurately returns the keys of stored chunks.
"""
Expand All @@ -366,9 +366,9 @@ def test_chunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> No
arr[region] = 1

if test_cls == Array:
observed = sorted(chunks_initialized(arr))
observed = sorted(await chunks_initialized(arr)) # Why doesn't mypy error here?
else:
observed = sorted(chunks_initialized(arr._async_array))
observed = sorted(await chunks_initialized(arr._async_array))

expected = sorted(keys)
assert observed == expected
Expand Down

0 comments on commit e684299

Please sign in to comment.