Skip to content

Commit

Permalink
Merge pull request #665 from norlandrhagen/consolidate_metadata
Browse files Browse the repository at this point in the history
Consolidate Metadata Transform
  • Loading branch information
norlandrhagen authored Jan 25, 2024
2 parents 6e40279 + fff47ac commit af3b80f
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 7 deletions.
3 changes: 2 additions & 1 deletion pangeo_forge_recipes/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,10 @@ def schema_to_zarr(
target_store: zarr.storage.FSStore,
target_chunks: Optional[Dict[str, int]] = None,
attrs: Optional[Dict[str, str]] = None,
consolidated_metadata: Optional[bool] = True,
) -> zarr.storage.FSStore:
"""Initialize a zarr group based on a schema."""
ds = schema_to_template_ds(schema, specified_chunks=target_chunks, attrs=attrs)
# using mode="w" makes this function idempotent
ds.to_zarr(target_store, mode="w", compute=False)
ds.to_zarr(target_store, mode="w", compute=False, consolidated=consolidated_metadata)
return target_store
30 changes: 25 additions & 5 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
from .patterns import CombineOp, Dimension, FileType, Index, augment_index_with_start_stop
from .rechunking import combine_fragments, split_fragment
from .storage import CacheFSSpecTarget, FSSpecTarget
from .writers import ZarrWriterMixin, store_dataset_fragment, write_combined_reference
from .writers import (
ZarrWriterMixin,
consolidate_metadata,
store_dataset_fragment,
write_combined_reference,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -359,11 +364,14 @@ class PrepareZarrTarget(beam.PTransform):
If chunking is present in the schema for a given dimension, the length of
the first fragment will be used. Otherwise, the dimension will not be chunked.
:param attrs: Extra group-level attributes to inject into the dataset.
:param consolidated_metadata: Bool controlling if xarray.to_zarr()
writes consolidated metadata. Default's to True.
"""

target: str | FSSpecTarget
target_chunks: Dict[str, int] = field(default_factory=dict)
attrs: Dict[str, str] = field(default_factory=dict)
consolidated_metadata: Optional[bool] = True

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
if isinstance(self.target, str):
Expand All @@ -372,7 +380,11 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
target = self.target
store = target.get_mapper()
initialized_target = pcoll | beam.Map(
schema_to_zarr, target_store=store, target_chunks=self.target_chunks, attrs=self.attrs
schema_to_zarr,
target_store=store,
target_chunks=self.target_chunks,
attrs=self.attrs,
consolidated_metadata=self.consolidated_metadata,
)
return initialized_target

Expand All @@ -388,9 +400,13 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
)


# TODO
# - consolidate coords
# - consolidate metadata
@dataclass
class ConsolidateMetadata(beam.PTransform):
"""Calls Zarr Python consolidate_metadata on an existing Zarr store or Kerchunk reference
(https://zarr.readthedocs.io/en/stable/_modules/zarr/convenience.html#consolidate_metadata)"""

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(consolidate_metadata)


@dataclass
Expand Down Expand Up @@ -580,6 +596,8 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
out https://github.com/jbusecke/dynamic_chunks
:param dynamic_chunking_fn_kwargs: Optional keyword arguments for ``dynamic_chunking_fn``.
:param attrs: Extra group-level attributes to inject into the dataset.
:param consolidated_metadata: Bool controlling if xarray.to_zarr()
writes consolidated metadata. Default's to True.
"""

# TODO: make it so we don't have to explicitly specify combine_dims
Expand All @@ -593,6 +611,7 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
dynamic_chunking_fn: Optional[Callable[[xr.Dataset], dict]] = None
dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict)
attrs: Dict[str, str] = field(default_factory=dict)
consolidated_metadata: Optional[bool] = True

def __post_init__(self):
if self.target_chunks and self.dynamic_chunking_fn:
Expand All @@ -618,6 +637,7 @@ def expand(
target=self.get_full_target(),
target_chunks=target_chunks,
attrs=self.attrs,
consolidated_metadata=self.consolidated_metadata,
)
n_target_stores = rechunked_datasets | StoreDatasetFragments(target_store=target_store)
singleton_target_store = (
Expand Down
22 changes: 21 additions & 1 deletion pangeo_forge_recipes/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,27 @@ def _is_first_in_merge_dim(index):
return True


def consolidate_metadata(store: MutableMapping) -> MutableMapping:
"""Consolidate metadata for a Zarr store or Kerchunk reference
:param store: Input Store for Zarr or Kerchunk reference
:type store: MutableMapping
:return: Output Store
:rtype: MutableMapping
"""

import zarr

if isinstance(store, fsspec.FSMap) and isinstance(store.fs, ReferenceFileSystem):
ref_path = store.fs.storage_args[0]
path = fsspec.get_mapper("reference://", fo=ref_path)
if isinstance(store, zarr.storage.FSStore):
path = store.path

zc = zarr.consolidate_metadata(path)
return zc


def store_dataset_fragment(
item: Tuple[Index, xr.Dataset], target_store: zarr.storage.FSStore
) -> zarr.storage.FSStore:
Expand All @@ -78,7 +99,6 @@ def store_dataset_fragment(

index, ds = item
zgroup = zarr.open_group(target_store)

# TODO: check that the dataset and the index are compatible

# only store coords if this is the first item in a merge dim
Expand Down
1 change: 1 addition & 0 deletions tests/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ def dynamic_chunking_fn(template_ds: xr.Dataset, divisor: int = 1):
combine_dims=pattern.combine_dim_keys,
attrs={},
dynamic_chunking_fn=dynamic_chunking_fn,
consolidated_metadata=True,
**kws,
)
open_store = target_store | OpenZarrStore()
Expand Down
65 changes: 65 additions & 0 deletions tests/test_writers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
import os

import apache_beam as beam
import fsspec
import pytest
import xarray as xr
import zarr

from pangeo_forge_recipes.aggregation import schema_to_zarr
from pangeo_forge_recipes.transforms import (
ConsolidateMetadata,
OpenWithKerchunk,
OpenWithXarray,
StoreToZarr,
WriteCombinedReference,
)
from pangeo_forge_recipes.types import CombineOp, Dimension, Index, IndexedPosition, Position
from pangeo_forge_recipes.writers import store_dataset_fragment

Expand Down Expand Up @@ -141,3 +152,57 @@ def test_store_dataset_fragment(temp_store):
# assert_identical() doesn't check encoding
# Checking the original time encoding units should be sufficient
assert ds.time.encoding.get("units") == ds_target.time.encoding.get("units")


def test_zarr_consolidate_metadata(
netcdf_local_file_pattern,
pipeline,
tmp_target_url,
):
pattern = netcdf_local_file_pattern
with pipeline as p:
(
p
| beam.Create(pattern.items())
| OpenWithXarray(file_type=pattern.file_type)
| StoreToZarr(
target_root=tmp_target_url,
store_name="store",
combine_dims=pattern.combine_dim_keys,
consolidated_metadata=False,
)
| ConsolidateMetadata()
)
zc = zarr.storage.FSStore(os.path.join(tmp_target_url, "store"))
assert zc[".zmetadata"] is not None


@pytest.mark.parametrize("output_file_name", ["reference.json", "reference.parquet"])
def test_reference_netcdf(
netcdf_local_file_pattern_sequential,
pipeline,
tmp_target_url,
# why are we not using tmp_target?
output_file_name,
):
pattern = netcdf_local_file_pattern_sequential
store_name = "daily-xarray-dataset"
with pipeline as p:
(
p
| beam.Create(pattern.items())
| OpenWithKerchunk(file_type=pattern.file_type)
| WriteCombinedReference(
identical_dims=["lat", "lon"],
target_root=tmp_target_url,
store_name=store_name,
concat_dims=["time"],
output_file_name=output_file_name,
)
| ConsolidateMetadata()
)

full_path = os.path.join(tmp_target_url, store_name, output_file_name)

mapper = fsspec.get_mapper("reference://", fo=full_path)
assert zarr.open_consolidated(mapper)

0 comments on commit af3b80f

Please sign in to comment.