diff --git a/ci/py3.10.yml b/ci/py3.10.yml new file mode 100644 index 00000000..b0a37e0c --- /dev/null +++ b/ci/py3.10.yml @@ -0,0 +1,52 @@ +name: pangeo-forge-recipes +channels: + - conda-forge +dependencies: + - python=3.10 + - aiohttp + - apache-beam + - black + - boto3 + - cfgrib + - cftime + - codecov + - dask + - distributed + - fastparquet + - fsspec>=2022.1.0 + - gcsfs>=2022.1.0 + - graphviz # needed for building tutorial notebooks + - h5netcdf + - h5py>=3.3.0 + - hdf5 + - intake + - intake-xarray + - kerchunk>=0.1.1 + - lxml # Optional dep of pydap + - matplotlib # needed for building tutorial notebooks + - netcdf4 + - numcodecs + - numpy + - pandas + - pip + - pydap + # bring back eventually once pynio conda-forge package supports py3.9 and does not + # conflict with ujson, which is a depencency of kerchunk's conda-forge feedstock. + # See: https://github.com/conda-forge/pynio-feedstock/issues/114 + # - pynio + - pytest + - pytest-cov + - pytest-lazy-fixture + - python-graphviz # needed for building tutorial notebooks + - rasterio + - requests + - rechunker>=0.4.2 + - scipy + - s3fs>=2022.1.0 + - setuptools + - toolz + - xarray>=0.18.0 + - zarr>=2.6.0 + - pip: + - nbmake>=1.3.0 # used in tutorial nb worklow + - pytest-timeout diff --git a/ci/py3.11.yml b/ci/py3.11.yml new file mode 100644 index 00000000..c3170617 --- /dev/null +++ b/ci/py3.11.yml @@ -0,0 +1,52 @@ +name: pangeo-forge-recipes +channels: + - conda-forge +dependencies: + - python=3.11 + - aiohttp + - apache-beam + - black + - boto3 + - cfgrib + - cftime + - codecov + - dask + - distributed + - fastparquet + - fsspec>=2022.1.0 + - gcsfs>=2022.1.0 + - graphviz # needed for building tutorial notebooks + - h5netcdf + - h5py>=3.3.0 + - hdf5 + - intake + - intake-xarray + - kerchunk>=0.1.1 + - lxml # Optional dep of pydap + - matplotlib # needed for building tutorial notebooks + - netcdf4 + - numcodecs + - numpy + - pandas + - pip + - pydap + # bring back eventually once pynio conda-forge package supports py3.9 and does not + # conflict with ujson, which is a depencency of kerchunk's conda-forge feedstock. + # See: https://github.com/conda-forge/pynio-feedstock/issues/114 + # - pynio + - pytest + - pytest-cov + - pytest-lazy-fixture + - python-graphviz # needed for building tutorial notebooks + - rasterio + - requests + - rechunker>=0.4.2 + - scipy + - s3fs>=2022.1.0 + - setuptools + - toolz + - xarray>=0.18.0 + - zarr>=2.6.0 + - pip: + - nbmake>=1.3.0 # used in tutorial nb worklow + - pytest-timeout diff --git a/ci/py3.9.yml b/ci/py3.9.yml new file mode 100644 index 00000000..e354350d --- /dev/null +++ b/ci/py3.9.yml @@ -0,0 +1,52 @@ +name: pangeo-forge-recipes +channels: + - conda-forge +dependencies: + - python=3.9 + - aiohttp + - apache-beam + - black + - boto3 + - cfgrib + - cftime + - codecov + - dask + - distributed + - fastparquet + - fsspec>=2022.1.0 + - gcsfs>=2022.1.0 + - graphviz # needed for building tutorial notebooks + - h5netcdf + - h5py>=3.3.0 + - hdf5 + - intake + - intake-xarray + - kerchunk>=0.1.1 + - lxml # Optional dep of pydap + - matplotlib # needed for building tutorial notebooks + - netcdf4 + - numcodecs + - numpy + - pandas + - pip + - pydap + # bring back eventually once pynio conda-forge package supports py3.9 and does not + # conflict with ujson, which is a depencency of kerchunk's conda-forge feedstock. + # See: https://github.com/conda-forge/pynio-feedstock/issues/114 + # - pynio + - pytest + - pytest-cov + - pytest-lazy-fixture + - python-graphviz # needed for building tutorial notebooks + - rasterio + - requests + - rechunker>=0.4.2 + - scipy + - s3fs>=2022.1.0 + - setuptools + - toolz + - xarray>=0.18.0 + - zarr>=2.6.0 + - pip: + - nbmake>=1.3.0 # used in tutorial nb worklow + - pytest-timeout diff --git a/pangeo_forge_recipes/storage.py b/pangeo_forge_recipes/storage.py index 94cb8f5b..42e888f6 100644 --- a/pangeo_forge_recipes/storage.py +++ b/pangeo_forge_recipes/storage.py @@ -117,13 +117,16 @@ def exists(self, path: str) -> bool: """Check that the file is in the cache.""" return self.fs.exists(self._full_path(path)) - def rm(self, path: str) -> None: + def rm(self, path: str, recursive: Optional[bool] = False) -> None: """Remove file from the cache.""" - self.fs.rm(self._full_path(path)) + self.fs.rm(self._full_path(path), recursive=recursive) def size(self, path: str) -> int: return self.fs.size(self._full_path(path)) + def makedir(self, path: str) -> None: + self.fs.makedir(self._full_path(path)) + @contextmanager def open(self, path: str, **kwargs) -> Iterator[OpenFileType]: """Open file with a context manager.""" diff --git a/pangeo_forge_recipes/transforms.py b/pangeo_forge_recipes/transforms.py index 50b9d409..23584edf 100644 --- a/pangeo_forge_recipes/transforms.py +++ b/pangeo_forge_recipes/transforms.py @@ -412,7 +412,6 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: @dataclass class CombineReferences(beam.PTransform): """Combines Kerchunk references into a single reference dataset. - :param concat_dims: Dimensions along which to concatenate inputs. :param identical_dims: Dimensions shared among all inputs. :mzz_kwargs: Additional kwargs to pass to ``kerchunk.combine.MultiZarrToZarr``. @@ -446,25 +445,28 @@ def expand(self, references: beam.PCollection) -> beam.PCollection: @dataclass class WriteCombinedReference(beam.PTransform, ZarrWriterMixin): """Store a singleton PCollection consisting of a ``kerchunk.combine.MultiZarrToZarr`` object. - :param store_name: Name for the Zarr store. It will be created with this name under `target_root`. :param target_root: Root path the Zarr store will be created inside; `store_name` will be appended to this prefix to create a full path. - :param output_json_fname: Name to give the output references file. Must end in ``.json``. + :param output_file_name: Name to give the output references file (.json or .parquet suffix.) + :param concat_dims: concat_dims kwarg to pass to write_combined_reference if using + .parquet as a storage format. """ store_name: str target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field( default_factory=RequiredAtRuntimeDefault ) - output_json_fname: str = "reference.json" + output_file_name: str = "reference.json" + concat_dims: List[str] = field(default_factory=list) def expand(self, reference: beam.PCollection) -> beam.PCollection: return reference | beam.Map( write_combined_reference, full_target=self.get_full_target(), - output_json_fname=self.output_json_fname, + concat_dims=self.concat_dims, + output_file_name=self.output_file_name, ) diff --git a/pangeo_forge_recipes/writers.py b/pangeo_forge_recipes/writers.py index 4ce5b01e..c2b896c2 100644 --- a/pangeo_forge_recipes/writers.py +++ b/pangeo_forge_recipes/writers.py @@ -1,9 +1,11 @@ import os -from typing import Protocol, Tuple, Union +from typing import List, Protocol, Tuple, Union +import fsspec import numpy as np import xarray as xr import zarr +from fsspec.implementations.reference import LazyReferenceMapper from kerchunk.combine import MultiZarrToZarr from .patterns import CombineOp, Index @@ -95,23 +97,47 @@ def store_dataset_fragment( def write_combined_reference( reference: MultiZarrToZarr, full_target: FSSpecTarget, - output_json_fname: str, -): + concat_dims: List[str], + output_file_name: str, + refs_per_component: int = 1000, +) -> FSSpecTarget: """Write a kerchunk combined references object to file.""" import ujson # type: ignore - multi_kerchunk = reference.translate() - file_ext = os.path.splitext(output_json_fname)[-1] + file_ext = os.path.splitext(output_file_name)[-1] + + outpath = full_target._full_path(output_file_name) if file_ext == ".json": - outpath = os.path.join(full_target.root_path, output_json_fname) + multi_kerchunk = reference.translate() with full_target.fs.open(outpath, "wb") as f: f.write(ujson.dumps(multi_kerchunk).encode()) + + elif file_ext == ".parquet": + + # Creates empty parquet store to be written to + if full_target.exists(output_file_name): + full_target.rm(output_file_name, recursive=True) + full_target.makedir(output_file_name) + + # kwargs to pass to MultiZarrToZarr + fs = fsspec.filesystem("file") + out = LazyReferenceMapper.create(refs_per_component, outpath, fs) + + # Calls MultiZarrToZarr on a MultiZarrToZarr object and adds kwargs to write to parquet. + MultiZarrToZarr( + [reference.translate()], concat_dims=concat_dims, remote_protocol="memory", out=out + ).translate() + + # call to write reference to empty parquet store + out.flush() + else: - # TODO: implement parquet writer raise NotImplementedError(f"{file_ext = } not supported.") + return full_target + class ZarrWriterProtocol(Protocol): """Protocol for mixin typing, following best practices described in: diff --git a/tests/test_end_to_end.py b/tests/test_end_to_end.py index e28c902f..d6f78222 100644 --- a/tests/test_end_to_end.py +++ b/tests/test_end_to_end.py @@ -10,6 +10,7 @@ import xarray as xr from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.testing.test_pipeline import TestPipeline +from fsspec.implementations.reference import ReferenceFileSystem from pangeo_forge_recipes.patterns import FilePattern, pattern_from_file_sequence from pangeo_forge_recipes.transforms import ( @@ -81,11 +82,13 @@ def test_xarray_zarr_subpath( xr.testing.assert_equal(ds.load(), daily_xarray_dataset) +@pytest.mark.parametrize("output_file_name", ["reference.json", "reference.parquet"]) def test_reference_netcdf( daily_xarray_dataset, netcdf_local_file_pattern_sequential, pipeline, tmp_target_url, + output_file_name, ): pattern = netcdf_local_file_pattern_sequential store_name = "daily-xarray-dataset" @@ -98,12 +101,25 @@ def test_reference_netcdf( | WriteCombinedReference( target_root=tmp_target_url, store_name=store_name, + concat_dims=["time"], + output_file_name=output_file_name, ) ) - full_path = os.path.join(tmp_target_url, store_name, "reference.json") - mapper = fsspec.get_mapper("reference://", fo=full_path) - ds = xr.open_dataset(mapper, engine="zarr", backend_kwargs={"consolidated": False}) - xr.testing.assert_equal(ds.load(), daily_xarray_dataset) + + full_path = os.path.join(tmp_target_url, store_name, output_file_name) + file_ext = os.path.splitext(output_file_name)[-1] + + if file_ext == ".json": + mapper = fsspec.get_mapper("reference://", fo=full_path) + ds = xr.open_dataset(mapper, engine="zarr", backend_kwargs={"consolidated": False}) + xr.testing.assert_equal(ds.load(), daily_xarray_dataset) + + elif file_ext == ".parquet": + fs = ReferenceFileSystem( + full_path, remote_protocol="file", target_protocol="file", lazy=True + ) + ds = xr.open_dataset(fs.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False}) + xr.testing.assert_equal(ds.load(), daily_xarray_dataset) @pytest.mark.xfail(