From 1938daf6165c3b2639b9a4a7f0f721bb5ba94a42 Mon Sep 17 00:00:00 2001 From: TimFuermann <83589894+TimFuermann@users.noreply.github.com> Date: Tue, 3 Dec 2024 16:36:39 +0100 Subject: [PATCH 1/6] update retrieve_data in era5.py - changed download_format to .zip - added entcrypting of .zip file from download and merging of contained .nc files - add url --- atlite/datasets/era5.py | 50 ++++++++++++++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/atlite/datasets/era5.py b/atlite/datasets/era5.py index 04e88015..011da0fd 100644 --- a/atlite/datasets/era5.py +++ b/atlite/datasets/era5.py @@ -10,6 +10,8 @@ import logging import os +import io +import zipfile import warnings import weakref from tempfile import mkstemp @@ -338,7 +340,15 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates): If you want to track the state of your request go to https://cds-beta.climate.copernicus.eu/requests?tab=all """ - request = {"product_type": "reanalysis", "format": "netcdf"} + + # Set url for data download, this allows to switch to different data + # sources more easily. + url = 'https://cds.climate.copernicus.eu/api' + + request = {"product_type": ["reanalysis"], + "data_format": "netcdf", + "download_format": "zip"} + request.update(updates) assert {"year", "month", "variable"}.issubset( @@ -346,7 +356,9 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates): ), "Need to specify at least 'variable', 'year' and 'month'" client = cdsapi.Client( - info_callback=logger.debug, debug=logging.DEBUG >= logging.root.level + url = url, + info_callback=logger.debug, + debug=logging.DEBUG >= logging.root.level ) result = client.retrieve(product, request) @@ -354,7 +366,7 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates): lock = nullcontext() with lock: - fd, target = mkstemp(suffix=".nc", dir=tmpdir) + fd, target_zip = mkstemp(suffix=".zip", dir=tmpdir) os.close(fd) # Inform user about data being downloaded as "* variable (year-month)" @@ -362,12 +374,34 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates): variables = atleast_1d(request["variable"]) varstr = "\n\t".join([f"{v} ({timestr})" for v in variables]) logger.info(f"CDS: Downloading variables\n\t{varstr}\n") - result.download(target) - - ds = xr.open_dataset(target, chunks=chunks or {}) + result.download(target_zip) + + # Open the .zip file in memory + with zipfile.ZipFile(target_zip, "r") as zf: + # Identify .nc files inside the .zip + nc_files = [name for name in zf.namelist() if name.endswith(".nc")] + + if not nc_files: + raise FileNotFoundError("No .nc files found in the downloaded .zip archive.") + + if len(nc_files) == 1: + # If there's only one .nc file, read it into memory + with zf.open(nc_files[0]) as nc_file: + # Pass the in-memory file-like object to Xarray + ds = xr.open_dataset(io.BytesIO(nc_file.read()), chunks=chunks or {}) + + else: + # If multiple .nc files, combine them using Xarray + datasets = [] + for nc_file in nc_files: + with zf.open(nc_file) as file: + datasets.append(xr.open_dataset(io.BytesIO(file.read()), chunks=chunks or {})) + # Combine datasets along temporal dimension + ds = xr.merge(datasets) + if tmpdir is None: - logger.debug(f"Adding finalizer for {target}") - weakref.finalize(ds._file_obj._manager, noisy_unlink, target) + logging.debug(f"Adding finalizer for {target_zip}") + weakref.finalize(ds._file_obj._manager, noisy_unlink, target_zip) return ds From 82b66443f22c00bc17eb2d78fc12100bb4d4be22 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 3 Dec 2024 15:37:31 +0000 Subject: [PATCH 2/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- atlite/datasets/era5.py | 52 ++++++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/atlite/datasets/era5.py b/atlite/datasets/era5.py index 011da0fd..f9c5a64f 100644 --- a/atlite/datasets/era5.py +++ b/atlite/datasets/era5.py @@ -8,12 +8,12 @@ https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation """ +import io import logging import os -import io -import zipfile import warnings import weakref +import zipfile from tempfile import mkstemp import cdsapi @@ -340,15 +340,17 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates): If you want to track the state of your request go to https://cds-beta.climate.copernicus.eu/requests?tab=all """ - - # Set url for data download, this allows to switch to different data + + # Set url for data download, this allows to switch to different data # sources more easily. - url = 'https://cds.climate.copernicus.eu/api' - - request = {"product_type": ["reanalysis"], - "data_format": "netcdf", - "download_format": "zip"} - + url = "https://cds.climate.copernicus.eu/api" + + request = { + "product_type": ["reanalysis"], + "data_format": "netcdf", + "download_format": "zip", + } + request.update(updates) assert {"year", "month", "variable"}.issubset( @@ -356,9 +358,7 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates): ), "Need to specify at least 'variable', 'year' and 'month'" client = cdsapi.Client( - url = url, - info_callback=logger.debug, - debug=logging.DEBUG >= logging.root.level + url=url, info_callback=logger.debug, debug=logging.DEBUG >= logging.root.level ) result = client.retrieve(product, request) @@ -375,30 +375,38 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates): varstr = "\n\t".join([f"{v} ({timestr})" for v in variables]) logger.info(f"CDS: Downloading variables\n\t{varstr}\n") result.download(target_zip) - + # Open the .zip file in memory with zipfile.ZipFile(target_zip, "r") as zf: # Identify .nc files inside the .zip nc_files = [name for name in zf.namelist() if name.endswith(".nc")] - + if not nc_files: - raise FileNotFoundError("No .nc files found in the downloaded .zip archive.") - + raise FileNotFoundError( + "No .nc files found in the downloaded .zip archive." + ) + if len(nc_files) == 1: # If there's only one .nc file, read it into memory with zf.open(nc_files[0]) as nc_file: # Pass the in-memory file-like object to Xarray - ds = xr.open_dataset(io.BytesIO(nc_file.read()), chunks=chunks or {}) - + ds = xr.open_dataset( + io.BytesIO(nc_file.read()), chunks=chunks or {} + ) + else: # If multiple .nc files, combine them using Xarray datasets = [] for nc_file in nc_files: with zf.open(nc_file) as file: - datasets.append(xr.open_dataset(io.BytesIO(file.read()), chunks=chunks or {})) + datasets.append( + xr.open_dataset( + io.BytesIO(file.read()), chunks=chunks or {} + ) + ) # Combine datasets along temporal dimension - ds = xr.merge(datasets) - + ds = xr.merge(datasets) + if tmpdir is None: logging.debug(f"Adding finalizer for {target_zip}") weakref.finalize(ds._file_obj._manager, noisy_unlink, target_zip) From a9869b04a8c8b0a952430de322bcdc30eae1c47b Mon Sep 17 00:00:00 2001 From: TimFuermann <83589894+TimFuermann@users.noreply.github.com> Date: Wed, 4 Dec 2024 11:03:15 +0100 Subject: [PATCH 3/6] Update pyproject.toml --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index fec16fc2..fcbad374 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ dependencies = [ "numexpr", "xarray>=2024.03.0", "netcdf4", + "h5netcdf", "dask>=2021.10.0", "toolz", "requests", From c65386dcc772f7dae493549c1fd0275d067d5b39 Mon Sep 17 00:00:00 2001 From: TimFuermann <83589894+TimFuermann@users.noreply.github.com> Date: Mon, 9 Dec 2024 09:32:03 +0100 Subject: [PATCH 4/6] Update retrieval times in era5 dataset This needs to be done, as the new cds api takes only string values for a valid data call. --- atlite/datasets/era5.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/atlite/datasets/era5.py b/atlite/datasets/era5.py index f9c5a64f..1a7d96a4 100644 --- a/atlite/datasets/era5.py +++ b/atlite/datasets/era5.py @@ -292,9 +292,9 @@ def retrieval_times(coords, static=False, monthly_requests=False): time = coords["time"].to_index() if static: return { - "year": str(time[0].year), - "month": str(time[0].month), - "day": str(time[0].day), + "year": [str(time[0].year)], + "month": [str(time[0].month).zfill(2)], + "day": [str(time[0].day).zfill(2)], "time": time[0].strftime("%H:00"), } @@ -306,21 +306,21 @@ def retrieval_times(coords, static=False, monthly_requests=False): for month in t.month.unique(): query = { "year": str(year), - "month": str(month), - "day": list(t[t.month == month].day.unique()), + "month": [str(month).zfill(2)], + "day": list(t[t.month == month].day.unique().astype(str).str.zfill(2)), "time": ["%02d:00" % h for h in t[t.month == month].hour.unique()], } times.append(query) else: query = { - "year": str(year), - "month": list(t.month.unique()), - "day": list(t.day.unique()), + "year": [str(year)], + "month": list(t.month.unique().astype(str).str.zfill(2)), + "day": list(t.day.unique().astype(str).str.zfill(2)), "time": ["%02d:00" % h for h in t.hour.unique()], } times.append(query) return times - + def noisy_unlink(path): """ From 37f9feaf3d95260690bf6cdfd4c472a67b32c98d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 9 Dec 2024 08:33:00 +0000 Subject: [PATCH 5/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- atlite/datasets/era5.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/atlite/datasets/era5.py b/atlite/datasets/era5.py index 1a7d96a4..0d683af5 100644 --- a/atlite/datasets/era5.py +++ b/atlite/datasets/era5.py @@ -307,7 +307,9 @@ def retrieval_times(coords, static=False, monthly_requests=False): query = { "year": str(year), "month": [str(month).zfill(2)], - "day": list(t[t.month == month].day.unique().astype(str).str.zfill(2)), + "day": list( + t[t.month == month].day.unique().astype(str).str.zfill(2) + ), "time": ["%02d:00" % h for h in t[t.month == month].hour.unique()], } times.append(query) @@ -320,7 +322,7 @@ def retrieval_times(coords, static=False, monthly_requests=False): } times.append(query) return times - + def noisy_unlink(path): """ From 31ccd28b5482de3bc305b6860638f4dd76af58ca Mon Sep 17 00:00:00 2001 From: TimFuermann <83589894+TimFuermann@users.noreply.github.com> Date: Mon, 9 Dec 2024 12:04:39 +0100 Subject: [PATCH 6/6] Update pyproject.toml Co-authored-by: Lukas Trippe --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index fcbad374..5a9eb093 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ dependencies = [ "numexpr", "xarray>=2024.03.0", "netcdf4", - "h5netcdf", + "h5netcdf", # For new cds api. Maybe make optional / may not be needed with future netcdf4 updates "dask>=2021.10.0", "toolz", "requests",