Skip to content

Commit

Permalink
accommodate tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ranchodeluxe committed Jan 17, 2024
1 parent 43601ca commit 599359f
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 11 deletions.
3 changes: 2 additions & 1 deletion pangeo_forge_recipes/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ class FSSpecTarget(AbstractTarget):
:param fs: The filesystem object we are writing to.
:param root_path: The path under which the target data will be stored.
:param fsspec_kwargs: The fsspec kwargs that can be reused as `target_options` and `remote_options` for fsspec class instantiation
:param fsspec_kwargs: The fsspec kwargs that can be reused as
`target_options` and `remote_options` for fsspec class instantiation
"""

fs: fsspec.AbstractFileSystem
Expand Down
14 changes: 11 additions & 3 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,14 +521,22 @@ class WriteCombinedReference(beam.PTransform, ZarrWriterMixin):
output_file_name: str = "reference.json"

def expand(self, references: beam.PCollection) -> beam.PCollection[zarr.storage.FSStore]:
# self.target_root in tests are strings so accommodate
if isinstance(self.target_root, FSSpecTarget):
storage_options = self.target_root.fsspec_kwargs
remote_protocol = self.target_root.get_fsspec_remote_protocol()
else:
storage_options = {}
remote_protocol = ''

return (
references
| CombineReferences(
concat_dims=self.concat_dims,
identical_dims=self.identical_dims,
target_options=self.target_root.fsspec_kwargs,
remote_options=self.target_root.fsspec_kwargs,
remote_protocol=self.target_root.get_fsspec_remote_protocol(),
target_options=storage_options,
remote_options=storage_options,
remote_protocol=remote_protocol,
mzz_kwargs=self.mzz_kwargs,
precombine_inputs=self.precombine_inputs,
)
Expand Down
23 changes: 16 additions & 7 deletions pangeo_forge_recipes/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,18 @@ def write_combined_reference(

import ujson # type: ignore

# full_target in tests are strings so accommodate
# setup storage options for fsspec.MultiZarrToZarr and fsspec.ReferenceFileSystem
if isinstance(full_target, FSSpecTarget):
storage_options = full_target.fsspec_kwargs
remote_protocol = full_target.get_fsspec_remote_protocol()
else:
storage_options = {}
remote_protocol = ''

# If reference is a ReferenceFileSystem, write to json
if isinstance(reference, fsspec.FSMap) and isinstance(reference.fs, ReferenceFileSystem):
# this context manager reuses dep injected auth credentials without having to pass storage options
# context manager reuses dep injected auth credentials without passing storage options
with full_target.fs.open(outpath, "wb") as f:
f.write(ujson.dumps(reference.fs.references).encode())

Expand All @@ -127,9 +136,9 @@ def write_combined_reference(
MultiZarrToZarr(
[reference],
concat_dims=concat_dims,
target_options=full_target.fsspec_kwargs,
remote_options=full_target.fsspec_kwargs,
remote_protocol=full_target.get_fsspec_remote_protocol(),
target_options=storage_options,
remote_options=storage_options,
remote_protocol=remote_protocol,
out=out,
**mzz_kwargs,
).translate()
Expand All @@ -141,9 +150,9 @@ def write_combined_reference(
raise NotImplementedError(f"{file_ext = } not supported.")
return ReferenceFileSystem(
outpath,
target_options=full_target.fsspec_kwargs,
remote_options=full_target.fsspec_kwargs,
remote_protocol=full_target.get_fsspec_remote_protocol(),
target_options=storage_options,
remote_options=storage_options,
remote_protocol=remote_protocol,
lazy=True,
).get_mapper()

Expand Down

0 comments on commit 599359f

Please sign in to comment.