-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Passing preprocessor to MultiZarrToZarr raises PicklingError #616
Comments
@chuckwondo Thanks for opening up a separate issue! I'll see if I can replicate you error and can provide any help. |
I found what I believe should be a valid means of removing the offending import os
from tempfile import TemporaryDirectory
import apache_beam as beam
import pandas as pd
import xarray as xr
from kerchunk.combine import drop # import handy-dandy drop function
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
CombineReferences,
OpenWithKerchunk,
WriteCombinedReference,
)
url_pattern = (
"https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
"MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
"ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
)
months = pd.date_range("1995-08", "2020-12", freq=pd.offsets.MonthBegin())
urls = tuple(url_pattern.format(time=month) for month in months)
pattern = pattern_from_file_sequence(urls, "time").prune(2)
temp_dir = TemporaryDirectory()
target_root = temp_dir.name
store_name = "ceda-monthly-daytime-lst"
target_store = os.path.join(target_root, store_name)
os.mkdir(target_store)
transforms = (
beam.Create(pattern.items())
| OpenWithKerchunk(file_type=pattern.file_type)
| CombineReferences(
concat_dims=["time"],
identical_dims=["lat", "lon", "channel"],
mzz_kwargs={"preprocess": drop("lst_unc_sys")}, # drop offending var
)
| WriteCombinedReference(target_root=target_root, store_name=store_name)
)
print(f"{pattern=}")
print(f"{pattern.file_type=}")
print(f"{target_store=} (exists: {os.path.isdir(target_store)})")
print(f"{transforms=}")
with beam.Pipeline() as p:
p | transforms # type: ignore[reportUnusedExpression]
with xr.open_zarr(target_store) as ds:
print(ds) Running the above produces the following:
Since the pickling error indicates that the issue concerns a nested function ( However, I also enabled kerchunk debug logging in an attempt to gain a bit of insight, and it seems to reveal that the function to drop the offending var successfully drops the var from the 1st of the 2 datasets pruned, presumably because it occurs in the main process, but the attempt to pickle the function to send to a separate process for handling the 2nd of the 2 datasets fails. import logging
import os
from tempfile import TemporaryDirectory
import apache_beam as beam
import pandas as pd
import xarray as xr
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
CombineReferences,
OpenWithKerchunk,
WriteCombinedReference,
)
logging.getLogger("kerchunk").addHandler(logging.StreamHandler())
logging.getLogger("kerchunk").setLevel(logging.DEBUG)
url_pattern = (
"https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
"MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
"ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
)
months = pd.date_range("1995-08", "2020-12", freq=pd.offsets.MonthBegin())
urls = tuple(url_pattern.format(time=month) for month in months)
pattern = pattern_from_file_sequence(urls, "time").prune(2)
temp_dir = TemporaryDirectory()
target_root = temp_dir.name
store_name = "ceda-monthly-daytime-lst"
target_store = os.path.join(target_root, store_name)
os.mkdir(target_store)
def drop_lst_unc_sys(refs):
for k in list(refs):
if k.startswith("lst_unc_sys"):
refs.pop(k)
return refs
transforms = (
beam.Create(pattern.items())
| OpenWithKerchunk(file_type=pattern.file_type)
| CombineReferences(
concat_dims=["time"],
identical_dims=["lat", "lon", "channel"],
mzz_kwargs={"preprocess": drop_lst_unc_sys}, # drop offending var
)
| WriteCombinedReference(target_root=target_root, store_name=store_name)
)
print(f"{pattern=}")
print(f"{pattern.file_type=}")
print(list(pattern.items()))
print(f"{target_store=} (exists: {os.path.isdir(target_store)})")
print(f"{transforms=}")
with beam.Pipeline() as p:
p | transforms # type: ignore[reportUnusedExpression]
with xr.open_zarr(target_store) as ds:
print(ds) Here is the output from running this modified version, with kerchunk debug logging:
|
@cisaacstern is this a case where cloudpickle would help? |
I'm not sure. FWIW, this exact use case is captured in our integration tests here
But I am not running that test currently precisely because I hit this same picking issue:
I suspect
Rather than |
@chuckwondo unfortunately I do not have a good sense of how to fix this at the moment. I've renamed this issue to reflect the root error here, which will take some greater effort to resolve. In the meantime, I suggest proceeding with #614 as you suggest. |
@cisaacstern, I believe I found how this is supposed to be addressed, but it still seems to have no effect. My initial sense of why this still doesn't work is that there may be a bug in Apache Beam, but I have further digging to do. When constructing a Here are a couple of examples (I suspect there are more) showing the use of the
In addition, I found some other beam issues that seem related, and indicate that perhaps using "cloudpickle" as the pickle library, when "save_main_session" is True, might do the trick. Unfortunately, specifying cloudpickle seems to have no effect either (it doesn't even seem to be used because the pickling exception still occurs at the same point -- i.e., cloudpickle doesn't appear to be used at all). Oddly, when I specify an unknown pickling library, beam complains (e.g.,
Here is the latest version of my code that makes use of these options, but still fails with the identical pickling error: from pathlib import Path
from typing import Any, Mapping
import apache_beam as beam
import pandas as pd
from apache_beam.options.pipeline_options import PipelineOptions
from kerchunk.combine import drop
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
CombineReferences,
OpenWithKerchunk,
WriteCombinedReference,
)
def drop_lst_unc_sys(refs: Mapping[str, Any]) -> Mapping[str, Any]:
return {k: v for k, v in refs.items() if not k.startswith("lst_unc_sys")}
# Monthly data spans from 1995-08 through 2020-12, with the exception of two
# gaps: (1) 1996-01 through 1996-06 and (2) 2001-02 through 2001-06.
months = (
pd.date_range("1995-08", "2020-12", freq=pd.offsets.MonthBegin())
.difference(pd.date_range("1996-01", "1996-06", freq=pd.offsets.MonthBegin()))
.difference(pd.date_range("2001-02", "2001-06", freq=pd.offsets.MonthBegin()))
)
url_pattern = (
"https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
"MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
"ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
)
urls = tuple(url_pattern.format(time=month) for month in months)
pattern = pattern_from_file_sequence(urls, "time").prune(1)
target_root = Path(__file__).parent.absolute()
store_name = "ceda-monthly-daytime-lst"
(target_root / store_name).mkdir(parents=True, exist_ok=True)
transforms = (
beam.Create(pattern.items())
| OpenWithKerchunk(file_type=pattern.file_type)
| CombineReferences(
concat_dims=["time"],
identical_dims=["lat", "lon", "channel"],
mzz_kwargs={"preprocess": drop_lst_unc_sys},
# Using next line instead of previous line produces same pickling error
# mzz_kwargs={"preprocess": drop("lst_unc_sys")},
)
| WriteCombinedReference(str(target_root), store_name)
)
options = PipelineOptions(pickle_library="cloudpickle", save_main_session=True)
with beam.Pipeline(options=options) as p:
_ = p | transforms Further, I'm using the following (via conda-forge):
Given that this seems to be an Apache Beam problem, shall I open a new issue over there? |
@chuckwondo thanks for digging into this further. Responses below.
When I attended the Beam Summit 2022 I spoke with a number of Beam maintainers who privately expressed skepticism about this option, describing it as a "hack", which I mention to say that if it doesn't perform as expected, that's perhaps expected. These same maintainers pointed to using a setup.py file as a more robust way of capturing the local context for a pipeline. Perhaps this is worth a try. Though the docs describe it as a way of managing "multiple file dependencies", IIUC it may also help with capturing the module-level namespace as well. We have an open issue to support this in the Pangeo Forge CLI.
Not all pipeline options are supported by all runners, so I suspect the issue here may be that the local direct runner does not support cloudpickle? We do use cloudpickle in the CLI for GCP Dataflow deployments, where I can confirm that it definitely does get used, and has solved certain pickling issues for us there.
You're certainly welcome to, and it probably is a good idea to bring this issue to their attention, though at the risk of stating the obvious, I wouldn't expect things to move quickly there, given the scale of the project. If you do want to open the issue, IMHO:
In terms of ideas for unblocking this without some miraculous fix by the Beam maintainers, I would suggest as a next step trying this with a |
Aha! It is indeed a bug in Apache Beam, which I just reported (and have a fix, as described in the bug): apache/beam#28558 |
Hi @chuckwondo and @cisaacstern, I think I had a similar problem with I used a local workaround which resolved it at the time: apache/beam#21615 (comment)., where I modified https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pipeline.py#L570 as follows: try:
pickle_library = self._options.view_as(SetupOptions).pickle_library
if pickle_library:
pickler.set_library(pickle_library)
pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle')) The issue has apparently been fixed so the above line number 570 in |
Hi @derekocallaghan, thanks for jumping in. It appears that fix has made it into the version I'm using (2.50.0), although it appears early on in What I encountered is that when |
To possibly bypass the memory issue I reported in #614, I've attempted to use kerchunk instead, based upon the HDF Reference Recipe for CMIP6, but also to no avail:
Running this version results in the following, so I don't know if there's a way to eliminate the problematic
length_scale
coordinate thatlst_unc_sys
uses. Also, I don't understand why I'm seeing the UserWarning about the time coord:Originally posted by @chuckwondo in #614 (comment)
The text was updated successfully, but these errors were encountered: