diff --git a/.github/workflows/test-integration.yaml b/.github/workflows/test-integration.yaml index 9aee6ec6..84437818 100644 --- a/.github/workflows/test-integration.yaml +++ b/.github/workflows/test-integration.yaml @@ -33,6 +33,7 @@ jobs: runner-version: [ "pangeo-forge-runner==0.9.1", "pangeo-forge-runner==0.9.2", + "pangeo-forge-runner==0.9.3", ] steps: - uses: actions/checkout@v4 diff --git a/examples/feedstock/hrrr_kerchunk_concat_step.py b/examples/feedstock/hrrr_kerchunk_concat_step.py index a0fce174..daebc768 100644 --- a/examples/feedstock/hrrr_kerchunk_concat_step.py +++ b/examples/feedstock/hrrr_kerchunk_concat_step.py @@ -53,7 +53,6 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore: concat_dims=pattern.concat_dims, identical_dims=identical_dims, precombine_inputs=True, - remote_protocol=remote_protocol, ) | "Test dataset" >> beam.Map(test_ds) ) diff --git a/pangeo_forge_recipes/combiners.py b/pangeo_forge_recipes/combiners.py index 0a200711..5b652c2c 100644 --- a/pangeo_forge_recipes/combiners.py +++ b/pangeo_forge_recipes/combiners.py @@ -65,7 +65,7 @@ class CombineMultiZarrToZarr(beam.CombineFn): along a dimension that does not exist in the individual inputs. In this latter case, precombining adds the additional dimension to the input so that its dimensionality will match that of the accumulator. - :param storage_options: Storage options dict to pass to the MultiZarrToZarr + :param target_options: Target options dict to pass to the MultiZarrToZarr """ diff --git a/pangeo_forge_recipes/storage.py b/pangeo_forge_recipes/storage.py index 42e888f6..040bffaa 100644 --- a/pangeo_forge_recipes/storage.py +++ b/pangeo_forge_recipes/storage.py @@ -9,8 +9,8 @@ import unicodedata from abc import ABC, abstractmethod from contextlib import contextmanager -from dataclasses import dataclass, replace -from typing import Iterator, Optional, Union +from dataclasses import dataclass, field, replace +from typing import Any, Dict, Iterator, Optional, Union from urllib.parse import parse_qs, urlencode, urlparse, urlunparse import fsspec @@ -87,10 +87,13 @@ class FSSpecTarget(AbstractTarget): :param fs: The filesystem object we are writing to. :param root_path: The path under which the target data will be stored. + :param fsspec_kwargs: The fsspec kwargs that can be reused as + `target_options` and `remote_options` for fsspec class instantiation """ fs: fsspec.AbstractFileSystem root_path: str = "" + fsspec_kwargs: Dict[Any, Any] = field(default_factory=dict) def __truediv__(self, suffix: str) -> FSSpecTarget: """ @@ -106,6 +109,20 @@ def from_url(cls, url: str): assert len(root_paths) == 1 return cls(fs, root_paths[0]) + def get_fsspec_remote_protocol(self): + """fsspec implementation-specific remote protocal""" + fsspec_protocol = self.fs.protocol + if isinstance(fsspec_protocol, str): + return fsspec_protocol + elif isinstance(fsspec_protocol, tuple): + return fsspec_protocol[0] + elif isinstance(fsspec_protocol, list): + return fsspec_protocol[0] + else: + raise ValueError( + f"could not resolve fsspec protocol '{fsspec_protocol}' from underlying filesystem" + ) + def get_mapper(self) -> fsspec.mapping.FSMap: """Get a mutable mapping object suitable for storing Zarr data.""" return FSStore(self.root_path, fs=self.fs) diff --git a/pangeo_forge_recipes/transforms.py b/pangeo_forge_recipes/transforms.py index da22cf2f..ef52f420 100644 --- a/pangeo_forge_recipes/transforms.py +++ b/pangeo_forge_recipes/transforms.py @@ -459,7 +459,6 @@ class CombineReferences(beam.PTransform): precombine_inputs: bool = False def expand(self, references: beam.PCollection) -> beam.PCollection: - return references | beam.CombineGlobally( CombineMultiZarrToZarr( concat_dims=self.concat_dims, @@ -482,9 +481,6 @@ class WriteReference(beam.PTransform, ZarrWriterMixin): will be appended to this prefix to create a full path. :param output_file_name: Name to give the output references file (``.json`` or ``.parquet`` suffix). - :param target_options: Storage options for opening target files - :param remote_options: Storage options for opening remote files - :param remote_protocol: If files are accessed over the network, provide the remote protocol over which they are accessed. e.g.: "s3", "gcp", "https", etc. :param mzz_kwargs: Additional kwargs to pass to ``kerchunk.combine.MultiZarrToZarr``. """ @@ -495,9 +491,6 @@ class WriteReference(beam.PTransform, ZarrWriterMixin): default_factory=RequiredAtRuntimeDefault ) output_file_name: str = "reference.json" - target_options: Optional[Dict] = field(default_factory=lambda: {"anon": True}) - remote_options: Optional[Dict] = field(default_factory=lambda: {"anon": True}) - remote_protocol: Optional[str] = None mzz_kwargs: dict = field(default_factory=dict) def expand(self, references: beam.PCollection) -> beam.PCollection: @@ -506,9 +499,6 @@ def expand(self, references: beam.PCollection) -> beam.PCollection: full_target=self.get_full_target(), concat_dims=self.concat_dims, output_file_name=self.output_file_name, - target_options=self.target_options, - remote_options=self.remote_options, - remote_protocol=self.remote_protocol, mzz_kwargs=self.mzz_kwargs, ) @@ -520,10 +510,6 @@ class WriteCombinedReference(beam.PTransform, ZarrWriterMixin): :param store_name: Zarr store will be created with this name under ``target_root``. :param concat_dims: Dimensions along which to concatenate inputs. :param identical_dims: Dimensions shared among all inputs. - :param target_options: Storage options for opening target files - :param remote_options: Storage options for opening remote files - :param remote_protocol: If files are accessed over the network, provide the remote protocol - over which they are accessed. e.g.: "s3", "gcp", "https", etc. :param mzz_kwargs: Additional kwargs to pass to ``kerchunk.combine.MultiZarrToZarr``. :param precombine_inputs: If ``True``, precombine each input with itself, using ``kerchunk.combine.MultiZarrToZarr``, before adding it to the accumulator. @@ -543,9 +529,6 @@ class WriteCombinedReference(beam.PTransform, ZarrWriterMixin): store_name: str concat_dims: List[str] identical_dims: List[str] - target_options: Optional[Dict] = field(default_factory=lambda: {"anon": True}) - remote_options: Optional[Dict] = field(default_factory=lambda: {"anon": True}) - remote_protocol: Optional[str] = None mzz_kwargs: dict = field(default_factory=dict) precombine_inputs: bool = False target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field( @@ -554,14 +537,18 @@ class WriteCombinedReference(beam.PTransform, ZarrWriterMixin): output_file_name: str = "reference.json" def expand(self, references: beam.PCollection) -> beam.PCollection[zarr.storage.FSStore]: + # unpack fsspec options that will be used below for transforms without dep injection + storage_options = self.target_root.fsspec_kwargs # type: ignore[union-attr] + remote_protocol = self.target_root.get_fsspec_remote_protocol() # type: ignore[union-attr] + return ( references | CombineReferences( concat_dims=self.concat_dims, identical_dims=self.identical_dims, - target_options=self.target_options, - remote_options=self.remote_options, - remote_protocol=self.remote_protocol, + target_options=storage_options, + remote_options=storage_options, + remote_protocol=remote_protocol, mzz_kwargs=self.mzz_kwargs, precombine_inputs=self.precombine_inputs, ) @@ -570,9 +557,6 @@ def expand(self, references: beam.PCollection) -> beam.PCollection[zarr.storage. concat_dims=self.concat_dims, target_root=self.target_root, output_file_name=self.output_file_name, - target_options=self.target_options, - remote_options=self.remote_options, - remote_protocol=self.remote_protocol, ) ) diff --git a/pangeo_forge_recipes/writers.py b/pangeo_forge_recipes/writers.py index 1902c744..41c013bf 100644 --- a/pangeo_forge_recipes/writers.py +++ b/pangeo_forge_recipes/writers.py @@ -119,9 +119,6 @@ def write_combined_reference( full_target: FSSpecTarget, concat_dims: List[str], output_file_name: str, - target_options: Optional[Dict] = {"anon": True}, - remote_options: Optional[Dict] = {"anon": True}, - remote_protocol: Optional[str] = None, refs_per_component: int = 1000, mzz_kwargs: Optional[Dict] = None, ) -> zarr.storage.FSStore: @@ -129,9 +126,17 @@ def write_combined_reference( file_ext = os.path.splitext(output_file_name)[-1] outpath = full_target._full_path(output_file_name) + import ujson # type: ignore + + # unpack fsspec options that will be used below for call sites without dep injection + storage_options = full_target.fsspec_kwargs # type: ignore[union-attr] + remote_protocol = full_target.get_fsspec_remote_protocol() # type: ignore[union-attr] + # If reference is a ReferenceFileSystem, write to json if isinstance(reference, fsspec.FSMap) and isinstance(reference.fs, ReferenceFileSystem): - reference.fs.save_json(outpath, **remote_options) + # context manager reuses dep injected auth credentials without passing storage options + with full_target.fs.open(outpath, "wb") as f: + f.write(ujson.dumps(reference.fs.references).encode()) elif file_ext == ".parquet": # Creates empty parquet store to be written to @@ -146,8 +151,8 @@ def write_combined_reference( MultiZarrToZarr( [reference], concat_dims=concat_dims, - target_options=target_options, - remote_options=remote_options, + target_options=storage_options, + remote_options=storage_options, remote_protocol=remote_protocol, out=out, **mzz_kwargs, @@ -160,8 +165,15 @@ def write_combined_reference( raise NotImplementedError(f"{file_ext = } not supported.") return ReferenceFileSystem( outpath, - target_options=target_options, - remote_options=remote_options, + target_options=storage_options, + # NOTE: `target_protocol` is required here b/c + # fsspec classes are inconsistent about deriving + # protocols if they are not passed. In this case ReferenceFileSystem + # decides how to read a reference based on `target_protocol` before + # it is automagically derived unfortunately + # https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/reference.py#L650-L663 + target_protocol=remote_protocol, + remote_options=storage_options, remote_protocol=remote_protocol, lazy=True, ).get_mapper() diff --git a/pyproject.toml b/pyproject.toml index b9bc45ad..5c827284 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,7 @@ test = [ "pytest-sugar", "pytest-timeout", "s3fs", + "gcsfs", "scipy", ] @@ -76,7 +77,7 @@ line-length = 100 [tool.isort] known_first_party = "pangeo_forge_recipes" -known_third_party = ["aiohttp", "apache_beam", "cftime", "click", "dask", "fsspec", "kerchunk", "numpy", "pandas", "pytest", "pytest_lazyfixture", "s3fs", "xarray", "zarr"] +known_third_party = ["aiohttp", "apache_beam", "cftime", "click", "dask", "fsspec", "gcsfs", "kerchunk", "numpy", "packaging", "pandas", "pytest", "pytest_lazyfixture", "s3fs", "xarray", "zarr"] multi_line_output = 3 include_trailing_comma = true force_grid_wrap = 0 diff --git a/tests/conftest.py b/tests/conftest.py index bf0238df..e9ce456d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -488,12 +488,6 @@ def tmp_target(tmpdir_factory): return FSSpecTarget(fs, path) -@pytest.fixture() -def tmp_target_url(tmpdir_factory): - path = str(tmpdir_factory.mktemp("target.zarr")) - return path - - @pytest.fixture() def tmp_cache(tmpdir_factory): path = str(tmpdir_factory.mktemp("cache")) diff --git a/tests/test_end_to_end.py b/tests/test_end_to_end.py index 38c4a0de..009f4410 100644 --- a/tests/test_end_to_end.py +++ b/tests/test_end_to_end.py @@ -36,7 +36,7 @@ def test_xarray_zarr( daily_xarray_dataset, netcdf_local_file_pattern, pipeline, - tmp_target_url, + tmp_target, target_chunks, ): pattern = netcdf_local_file_pattern @@ -46,14 +46,14 @@ def test_xarray_zarr( | beam.Create(pattern.items()) | OpenWithXarray(file_type=pattern.file_type) | StoreToZarr( - target_root=tmp_target_url, + target_root=tmp_target, store_name="store", target_chunks=target_chunks, combine_dims=pattern.combine_dim_keys, ) ) - ds = xr.open_dataset(os.path.join(tmp_target_url, "store"), engine="zarr") + ds = xr.open_dataset(os.path.join(tmp_target.root_path, "store"), engine="zarr") assert ds.time.encoding["chunks"] == (target_chunks["time"],) xr.testing.assert_equal(ds.load(), daily_xarray_dataset) @@ -62,7 +62,7 @@ def test_xarray_zarr_subpath( daily_xarray_dataset, netcdf_local_file_pattern_sequential, pipeline, - tmp_target_url, + tmp_target, ): pattern = netcdf_local_file_pattern_sequential with pipeline as p: @@ -71,13 +71,13 @@ def test_xarray_zarr_subpath( | beam.Create(pattern.items()) | OpenWithXarray(file_type=pattern.file_type) | StoreToZarr( - target_root=tmp_target_url, + target_root=tmp_target, store_name="subpath", combine_dims=pattern.combine_dim_keys, ) ) - ds = xr.open_dataset(os.path.join(tmp_target_url, "subpath"), engine="zarr") + ds = xr.open_dataset(os.path.join(tmp_target.root_path, "subpath"), engine="zarr") xr.testing.assert_equal(ds.load(), daily_xarray_dataset) @@ -86,7 +86,7 @@ def test_reference_netcdf( daily_xarray_dataset, netcdf_local_file_pattern_sequential, pipeline, - tmp_target_url, + tmp_target, output_file_name, ): pattern = netcdf_local_file_pattern_sequential @@ -98,13 +98,13 @@ def test_reference_netcdf( | OpenWithKerchunk(file_type=pattern.file_type) | WriteCombinedReference( identical_dims=["lat", "lon"], - target_root=tmp_target_url, + target_root=tmp_target, store_name=store_name, concat_dims=["time"], output_file_name=output_file_name, ) ) - full_path = os.path.join(tmp_target_url, store_name, output_file_name) + full_path = os.path.join(tmp_target.root_path, store_name, output_file_name) file_ext = os.path.splitext(output_file_name)[-1] if file_ext == ".json": mapper = fsspec.get_mapper("reference://", fo=full_path) @@ -130,7 +130,7 @@ def test_reference_netcdf( ) def test_reference_grib( pipeline, - tmp_target_url, + tmp_target, ): # This test adapted from: # https://github.com/fsspec/kerchunk/blob/33b00d60d02b0da3f05ccee70d6ebc42d8e09932/kerchunk/tests/test_grib.py#L14-L31 @@ -148,11 +148,11 @@ def test_reference_grib( | WriteCombinedReference( concat_dims=[pattern.concat_dims[0]], identical_dims=["latitude", "longitude"], - target_root=tmp_target_url, + target_root=tmp_target, store_name=store_name, ) ) - full_path = os.path.join(tmp_target_url, store_name, "reference.json") + full_path = os.path.join(tmp_target.root_path, store_name, "reference.json") mapper = fsspec.get_mapper("reference://", fo=full_path) ds = xr.open_dataset(mapper, engine="zarr", backend_kwargs={"consolidated": False}) assert ds.attrs["GRIB_centre"] == "cwao" diff --git a/tests/test_integration.py b/tests/test_integration.py index 7ee49516..0c00afa8 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -3,9 +3,11 @@ import secrets import subprocess import time +from importlib.metadata import version from pathlib import Path import pytest +from packaging.version import parse as parse_version # Run only when the `--run-integration` option is passed. # See also `pytest_addoption` in conftest. Reference: @@ -119,6 +121,10 @@ def test_integration(confpath_option: str, recipe_id: str, request): if recipe_id in xfails: pytest.xfail(xfails[recipe_id]) + runner_version = parse_version(version("pangeo-forge-runner")) + if recipe_id == "hrrr-kerchunk-concat-step" and runner_version <= parse_version("0.9.2"): + pytest.xfail("pg-runner version <= 0.9.2 didn't pass storage options") + confpath = request.getfixturevalue(confpath_option) bake_script = (EXAMPLES / "runner-commands" / "bake.sh").absolute().as_posix() diff --git a/tests/test_storage.py b/tests/test_storage.py index 1a1e2da7..b621d8f8 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -4,6 +4,8 @@ import pytest from fsspec.implementations.http import HTTPFileSystem from fsspec.implementations.local import LocalFileSystem +from gcsfs import GCSFileSystem +from s3fs import S3FileSystem from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget @@ -95,3 +97,20 @@ def test_suffix(tmp_path): assert str((FSSpecTarget(LocalFileSystem(), tmp_path) / "test").root_path) == str( tmp_path / "test" ) + + +@pytest.mark.parametrize("fs_cls", [LocalFileSystem, HTTPFileSystem, S3FileSystem, GCSFileSystem]) +def test_target_storage_get_remote_protocol(fs_cls, monkeypatch): + # we need to use patch here for s3fs and gcsfs b/c they try to do so much on __init__ + monkeypatch.setattr("s3fs.S3FileSystem.__init__", lambda x: None) + monkeypatch.setattr("gcsfs.GCSFileSystem.__init__", lambda x: None) + monkeypatch.setattr("pangeo_forge_recipes.storage.FSSpecTarget.__post_init__", lambda x: None) + target_root = FSSpecTarget(fs_cls()) + if isinstance(target_root, LocalFileSystem): + assert target_root.fs.get_fsspec_remote_protocol() == "local" + elif isinstance(target_root, HTTPFileSystem): + assert target_root.fs.get_fsspec_remote_protocol() == "http" + elif isinstance(target_root, S3FileSystem): + assert target_root.fs.get_fsspec_remote_protocol() == "s3" + elif isinstance(target_root, GCSFileSystem): + assert target_root.fs.get_fsspec_remote_protocol() == "gcs" diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 0b2a3532..2451600c 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -8,7 +8,7 @@ from pangeo_forge_recipes.aggregation import dataset_to_schema from pangeo_forge_recipes.patterns import FilePattern, FileType -from pangeo_forge_recipes.storage import CacheFSSpecTarget +from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget from pangeo_forge_recipes.transforms import ( DetermineSchema, IndexItems, @@ -151,7 +151,7 @@ def test_OpenWithKerchunk_direct(pattern_direct, pipeline): @pytest.mark.parametrize("target_chunks", [{}, {"time": 1}, {"time": 2}, {"time": 2, "lon": 9}]) -def test_PrepareZarrTarget(pipeline, tmp_target_url, target_chunks): +def test_PrepareZarrTarget(pipeline, tmp_target, target_chunks): ds = make_ds() schema = dataset_to_schema(ds) @@ -181,7 +181,7 @@ def _check_target(actual): with pipeline as p: input = p | beam.Create([schema]) - target = input | PrepareZarrTarget(target=tmp_target_url, target_chunks=target_chunks) + target = input | PrepareZarrTarget(target=tmp_target, target_chunks=target_chunks) assert_that(target, correct_target()) @@ -246,7 +246,7 @@ def expand(self, pcoll): def test_StoreToZarr_emits_openable_fsstore( pipeline, netcdf_local_file_pattern_sequential, - tmp_target_url, + tmp_target, ): def is_xrdataset(): def _is_xr_dataset(actual): @@ -260,7 +260,7 @@ def _is_xr_dataset(actual): with pipeline as p: datasets = p | beam.Create(pattern.items()) | OpenWithXarray() target_store = datasets | StoreToZarr( - target_root=tmp_target_url, + target_root=tmp_target, store_name="test.zarr", combine_dims=pattern.combine_dim_keys, ) @@ -272,7 +272,7 @@ def _is_xr_dataset(actual): def test_StoreToZarr_dynamic_chunking_interface( pipeline: beam.Pipeline, netcdf_local_file_pattern_sequential: FilePattern, - tmp_target_url: str, + tmp_target: FSSpecTarget, daily_xarray_dataset: xr.Dataset, with_kws: bool, ): @@ -305,7 +305,7 @@ def dynamic_chunking_fn(template_ds: xr.Dataset, divisor: int = 1): with pipeline as p: datasets = p | beam.Create(pattern.items()) | OpenWithXarray() target_store = datasets | StoreToZarr( - target_root=tmp_target_url, + target_root=tmp_target, store_name="test.zarr", combine_dims=pattern.combine_dim_keys, attrs={}, diff --git a/tests/test_writers.py b/tests/test_writers.py index a60f132a..3955116f 100644 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -157,7 +157,7 @@ def test_store_dataset_fragment(temp_store): def test_zarr_consolidate_metadata( netcdf_local_file_pattern, pipeline, - tmp_target_url, + tmp_target, ): pattern = netcdf_local_file_pattern with pipeline as p: @@ -166,14 +166,14 @@ def test_zarr_consolidate_metadata( | beam.Create(pattern.items()) | OpenWithXarray(file_type=pattern.file_type) | StoreToZarr( - target_root=tmp_target_url, + target_root=tmp_target, store_name="store", combine_dims=pattern.combine_dim_keys, consolidated_metadata=False, ) | ConsolidateMetadata() ) - zc = zarr.storage.FSStore(os.path.join(tmp_target_url, "store")) + zc = zarr.storage.FSStore(os.path.join(tmp_target.root_path, "store")) assert zc[".zmetadata"] is not None @@ -181,7 +181,7 @@ def test_zarr_consolidate_metadata( def test_reference_netcdf( netcdf_local_file_pattern_sequential, pipeline, - tmp_target_url, + tmp_target, # why are we not using tmp_target? output_file_name, ): @@ -194,7 +194,7 @@ def test_reference_netcdf( | OpenWithKerchunk(file_type=pattern.file_type) | WriteCombinedReference( identical_dims=["lat", "lon"], - target_root=tmp_target_url, + target_root=tmp_target, store_name=store_name, concat_dims=["time"], output_file_name=output_file_name, @@ -202,7 +202,7 @@ def test_reference_netcdf( | ConsolidateMetadata() ) - full_path = os.path.join(tmp_target_url, store_name, output_file_name) + full_path = os.path.join(tmp_target.root_path, store_name, output_file_name) mapper = fsspec.get_mapper("reference://", fo=full_path) assert zarr.open_consolidated(mapper)