Skip to content

Commit

Permalink
Multiple Bug Fixes (#155)
Browse files Browse the repository at this point in the history
* Moving deps
* Adding better error message for time out
* Change log
  • Loading branch information
NickGeneva authored Nov 21, 2024
1 parent 6d2a6c3 commit a03e2a4
Show file tree
Hide file tree
Showing 12 changed files with 128 additions and 93 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Removed

- Removed `available` function from CDS datasource

### Fixed

### Security

### Dependencies

- Moving several ECMWF dependencies to optional
- Adding minimum version for numpy
- Bump minimum CDS API version for new API
- Moving unique data packages to optional deps

## [0.3.0] - 2024-09-24

### Added
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Earth2Studio is provided under the Apache License 2.0, please see

<!-- Badge links -->

[e2studio_python_img]: https://img.shields.io/badge/Python-3.10%2B-blue?style=flat-square&logo=python
[e2studio_python_img]: https://img.shields.io/badge/Python-3.10%20|%203.11-blue?style=flat-square&logo=python
[e2studio_license_img]: https://img.shields.io/badge/License-Apache%202.0-green?style=flat-square
[e2studio_format_img]: https://img.shields.io/badge/Code%20Style-Black-black?style=flat-square
[e2studio_cov_img]: https://img.shields.io/codecov/c/github/nickgeneva/earth2studio?style=flat-square&logo=codecov
Expand Down
12 changes: 12 additions & 0 deletions docs/userguide/about/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ We encourage users that face package issues to familiarize themselves with the o
model installs and suggested environment set up for the most complete experience.
:::

(data_dependencies)=

## Datasource Dependencies

Some data sources require additional dependencies, libraries or specific Python versions
to install.
To install all dependencies

```bash
pip install earth2studio[data]
```

(model_dependencies)=

## Model Dependencies
Expand Down
3 changes: 2 additions & 1 deletion docs/userguide/components/io.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ Most data stores offer several additional utilities such as `__contains__`,
{mod}`earth2studio.io.ZarrBackend`:

```{literalinclude} ../../../earth2studio/io/zarr.py
:lines: 53-81
:language: python
:start-after: sphinx - io zarr start
:end-before: sphinx - io zarr end
```

Because of `datetime` compatibility, we recommend using the `ZarrBackend` as a default.
Expand Down
17 changes: 17 additions & 0 deletions docs/userguide/support/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,20 @@ Whether its you're own model or someone else's checkpoint, we are always interes
making Earth2Studio as feature rich as possible for users.
Open an issue to discuss the model you're interested in using / integrating and we can
work out a plan to get it integrated.

## Earth2Studio requires X.Y.Z package or Python version, can I use another?

Earth2Studio has adopted the [scientific python](https://scientific-python.org/specs/)
spec 0 for minimum supported dependencies.
This mean adopting a common time-based policy for dropping dependencies to encourage the
use of modern Python and packages.
This helps improve matainance of the package and security posture.
This does not imply a strict requirement for all functionality and does not apply to
optional packages.

## Install AttributeError: module 'pkgutil' with Python 3.12

Python 3.12 is presently broken due to core dependencies of Earth2Studio blocking the
install.
We are working on resolving this as soon as we can, we suggest downgrading to 3.11 in
the mean time.
112 changes: 37 additions & 75 deletions earth2studio/data/cds.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
# limitations under the License.

import hashlib
import multiprocessing as mp
import os
import pathlib
import shutil
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
from time import sleep
from typing import Any

import cdsapi
try:
import cdsapi
except ImportError:
cdsapi = None
import numpy as np
import xarray as xr
from loguru import logger
Expand Down Expand Up @@ -71,9 +74,10 @@ class CDS:
Note
----
Additional information on the data repository can be referenced here:
Additional information on the data repository, registration, and authentication can
be referenced here:
- https://cds.climate.copernicus.eu/cdsapp#!/home
- https://cds.climate.copernicus.eu/how-to-api
"""

MAX_BYTE_SIZE = 20000000
Expand All @@ -82,6 +86,12 @@ class CDS:
CDS_LON = np.linspace(0, 359.75, 1440)

def __init__(self, cache: bool = True, verbose: bool = True):
# Optional import not installed error
if cdsapi is None:
raise ImportError(
"cdsapi is not installed, install manually or using `pip install earth2studio[data]`"
)

self._cache = cache
self._verbose = verbose
self.cds_client = cdsapi.Client(
Expand Down Expand Up @@ -152,11 +162,11 @@ def fetch_cds_dataarray(
if isinstance(variables, str):
variables = [variables]
requests = self._build_requests(time, variables)
pbar = tqdm(
total=len(requests),
desc=f"Fetching CDS for {time}",
disable=(not self._verbose),
)
# pbar = tqdm(
# total=len(requests),
# desc=f"Fetching CDS for {time}",
# disable=(not self._verbose),
# )

# Fetch process for getting data off CDS
def _fetch_process(request: CDSRequest, rank: int, return_dict: dict) -> None:
Expand All @@ -168,18 +178,22 @@ def _fetch_process(request: CDSRequest, rank: int, return_dict: dict) -> None:
)
return_dict[i] = grib_file

manager = mp.Manager()
return_dict = manager.dict()
processes = []
return_dict: dict[int, Any] = {}
for i, request in enumerate(requests):
process = mp.Process(target=_fetch_process, args=(request, i, return_dict))
processes.append(process)
process.start()
_fetch_process(request, i, return_dict)

# wait for all processes to complete
for process in processes:
process.join()
pbar.update(1)
# manager = mp.Manager()
# return_dict = manager.dict()
# processes = []
# for i, request in enumerate(requests):
# process = mp.Process(target=_fetch_process, args=(request, i, return_dict))
# processes.append(process)
# process.start()

# # wait for all processes to complete
# for process in processes:
# process.join()
# pbar.update(1)

da = xr.DataArray(
data=np.empty((1, len(variables), len(self.CDS_LAT), len(self.CDS_LON))),
Expand Down Expand Up @@ -289,6 +303,7 @@ def _download_cds_grib_cached(
"day": time.day,
"time": time.strftime("%H:00"),
"format": "grib",
"download_format": "unarchived",
}
if dataset_name == "reanalysis-era5-pressure-levels":
rbody["pressure_level"] = level
Expand All @@ -304,14 +319,16 @@ def _download_cds_grib_cached(
break
elif reply["state"] in ("queued", "running"):
logger.debug(f"Request ID: {reply['request_id']}, sleeping")
sleep(1.0)
sleep(5.0)
elif reply["state"] in ("failed",):
logger.error(
f"CDS request fail for: {dataset_name} {variable} {level} {time}"
)
logger.error(f"Message: {reply['error'].get('message')}")
logger.error(f"Reason: {reply['error'].get('reason')}")
raise Exception("%s." % (reply["error"].get("message")))
else:
sleep(2.0)
# Download when ready
r.download(cache_path)

Expand All @@ -328,58 +345,3 @@ def cache(self) -> str:
cache_location, f"tmp_{DistributedManager().rank}"
)
return cache_location

@classmethod
def available(
cls,
time: datetime | np.datetime64,
) -> bool:
"""Checks if given date time is avaliable in the CDS with the pressure level
database
Parameters
----------
time : datetime | np.datetime64
Date time to access
Returns
-------
bool
If date time is avaiable
"""
if isinstance(time, np.datetime64): # np.datetime64 -> datetime
_unix = np.datetime64(0, "s")
_ds = np.timedelta64(1, "s")
time = datetime.utcfromtimestamp((time - _unix) / _ds)

client = cdsapi.Client(debug=False, quiet=True, wait_until_complete=False)
# Assemble request
r = client.retrieve(
"reanalysis-era5-single-levels",
{
"variable": "2m_temperature",
"product_type": "reanalysis",
"year": time.year,
"month": time.month,
"day": time.day,
"time": time.strftime("%H:00"),
"format": "grib",
},
)
# Queue request
while True:
r.update()
reply = r.reply
logger.debug(f"Request ID:{reply['request_id']}, state: {reply['state']}")
if reply["state"] == "completed":
break
elif reply["state"] in ("queued", "running"):
logger.debug(f"Request ID: {reply['request_id']}, sleeping")
sleep(0.5)
elif reply["state"] in ("failed",):
logger.error(f"CDS request fail for {time}")
logger.error(f"Message: {reply['error'].get('message')}")
logger.error(f"Reason: {reply['error'].get('reason')}")
return False

return True
8 changes: 8 additions & 0 deletions earth2studio/data/hrrr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import importlib.util
import os
import pathlib
import shutil
Expand All @@ -39,6 +40,13 @@

class _HRRRBase:
def __init__(self, cache: bool = True, verbose: bool = True):
# Optional import not installed error
herbie = importlib.util.find_spec("herbie")
if herbie is None:
raise ImportError(
"herbie-data is not installed, install manually or using `pip install earth2studio[data]`"
)

self._cache = cache
self._verbose = verbose

Expand Down
13 changes: 11 additions & 2 deletions earth2studio/data/ifs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import shutil
from datetime import datetime

import ecmwf.opendata
try:
import ecmwf.opendata as opendata
except ImportError:
opendata = None
import numpy as np
import xarray as xr
from loguru import logger
Expand Down Expand Up @@ -73,9 +76,15 @@ class IFS:
IFS_LON = np.linspace(0, 359.75, 1440)

def __init__(self, cache: bool = True, verbose: bool = True):
# Optional import not installed error
if opendata is None:
raise ImportError(
"ecmwf-opendata is not installed, install manually or using `pip install earth2studio[data]`"
)

self._cache = cache
self._verbose = verbose
self.client = ecmwf.opendata.Client(source="azure")
self.client = opendata.Client(source="azure")

def __call__(
self,
Expand Down
2 changes: 2 additions & 0 deletions earth2studio/io/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ZarrBackend:
For keyword argument options see: https://zarr.readthedocs.io/en/stable/api/hierarchy.html
"""

# sphinx - io zarr start
def __init__(
self,
file_name: str = None,
Expand Down Expand Up @@ -104,6 +105,7 @@ def __iter__(
"""Return an iterator over Zarr Group member names."""
return self.root.__iter__()

# sphinx - io zarr end
def add_array(
self,
coords: CoordSystem,
Expand Down
8 changes: 7 additions & 1 deletion earth2studio/models/auto/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,13 @@ def open(self, file_path: str) -> io.BufferedReader:
},
tqdm_cls=TqdmFormat,
) as callback:
return self.fs.open(full_path, callback=callback)
try:
return self.fs.open(full_path, callback=callback)
except fsspec.exceptions.FSTimeoutError as e:
logger.error(
f"Package fetch timeout. Consider increasing timeout through environment variable 'EARTH2STUDIO_PACKAGE_TIMEOUT'. Currently {self.default_timeout()}s."
)
raise e

def resolve(self, file_path: str) -> str:
"""Resolves current relative file path to absolute path inside Package cache
Expand Down
Loading

0 comments on commit a03e2a4

Please sign in to comment.