Skip to content

Commit

Permalink
Initial Commit For ERA5T Support
Browse files Browse the repository at this point in the history
  • Loading branch information
Darshan committed Feb 17, 2025
1 parent c64c50e commit f200bb9
Show file tree
Hide file tree
Showing 36 changed files with 853 additions and 354 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
ARG py_version=3.9
ARG py_version=3.8
FROM apache/beam_python${py_version}_sdk:2.40.0 as beam_sdk
FROM continuumio/miniconda3:latest

Expand Down
55 changes: 13 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,6 @@ This feature is works in 4 parts.
1. Acquiring raw data from CDS, facilitated by [`weather-dl`](https://weather-tools.readthedocs.io/en/latest/weather_dl/README.html) tool.
2. Splitting raw data using [`weather-sp`](https://weather-tools.readthedocs.io/en/latest/weather_sp/README.html).
3. Ingest this splitted data into a zarr file.
4. [**WIP**] Ingest [`AR`](gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3) data into BigQuery with the assistance of the [`weather-mv`](https://weather-tools.readthedocs.io/en/latest/weather_mv/README.html).

#### How to Run.
1. Set up a Cloud project with sufficient permissions to use cloud storage (such as [GCS](https://cloud.google.com/storage)) and a Beam runner (such as [Dataflow](https://cloud.google.com/dataflow)).
Expand All @@ -756,26 +755,18 @@ This feature is works in 4 parts.
3. Add the all `Copernicus` licenses into the [secret-manager](https://cloud.google.com/secret-manager) with value likes this: {"api_url": "URL", "api_key": "KEY"}
> NOTE: for every API_KEY there must be unique secret-key.
4. Update all of these variable in [docker-file](data_automate/Dockerfile).
4. Update all of these variable in [docker-file](deployment/Dockerfile).
* `PROJECT`
* `REGION`
* `BUCKET`
* `MANIFEST_LOCATION`
* `API_KEY_*`
* In case of multiple API keys, API_KEY must follow this format: `API_KEY_*`. here * can be numeric value i.e. 1, 2.
* API_KEY_* value is the resource name of [secret-manager key](https://cloud.google.com/secret-manager) and it's value looks like this :: ```projects/PROJECT_NAME/secrets/SECRET_KEY_NAME/versions/1```
* `WEATHER_TOOLS_SDK_CONTAINER_IMAGE`
* Is made using this [dockerfile](https://github.com/google/weather-tools/blob/setuptools/Dockerfile) and is stored in a docker registry.
* `ARCO_ERA5_SDK_CONTAINER_IMAGE`
* `BQ_TABLES_LIST`
* `REGION_LIST`

> * In case of multiple API keys, API_KEY must follow this format: `API_KEY_*`. here * can be numeric value i.e. 1, 2.
> * API_KEY_* value is the resource name of [secret-manager key](https://cloud.google.com/secret-manager) and it's value looks like this :: ```projects/PROJECT_NAME/secrets/SECRET_KEY_NAME/versions/1```
> * `BQ_TABLES_LIST` is list of the BigQuery table in which data is ingested and it's value is like this ::
```'["PROJECT.DATASET.TABLE1", "PROJECT.DATASET.TABLE2", ..., "PROJECT.DATASET.TABLE6"]'```.
> * `REGION_LIST` is list of the GCP_region in which the job of ingestion will run ::
```'["us-east1", "us-west4",..., "us-west2"]'```.
> * Size of `BQ_TABLES_LIST` and `REGION_LIST` must be **6** as total 6 zarr file processed in the current pipeline and also, data ingestion in Bigquery are corresponding to `ZARR_FILES_LIST` of [raw-to-zarr-to-bq.py](/arco-era5/src/raw-to-zarr-to-bq.py) so add table name in `BQ_TABLES_LIST` accordingly.
> * `WEATHER_TOOLS_SDK_CONTAINER_IMAGE` is made using this [dockerfile](https://github.com/google/weather-tools/blob/setuptools/Dockerfile) and is stored in a docker registry.
> * `ARCO_ERA5_SDK_CONTAINER_IMAGE` is made using this [dockerfile](https://github.com/google-research/arco-era5/blob/main/Dockerfile) and is stored in a registry.
* Is made using this [dockerfile](https://github.com/google-research/arco-era5/blob/main/Dockerfile) and is stored in a registry.


5. Create docker image.
Expand All @@ -787,37 +778,17 @@ export REPO=<repo> eg:arco-era5-raw-to-zarr-to-bq
gcloud builds submit . --tag "gcr.io/$PROJECT_ID/$REPO:latest"
```

7. Create a VM using above created docker-image
7. Run script to create cloud run jobs. [create_job](deployment/create_job.py)
```
export ZONE=<zone> eg: us-central1-a
export SERVICE_ACCOUNT=<service account> # Let's keep this as Compute Engine Default Service Account
export IMAGE_PATH=<container-image-path> # The above created image-path
gcloud compute instances create-with-container arco-era5-raw-to-zarr-to-bq \ --project=$PROJECT_ID \
--zone=$ZONE \
--machine-type=n2-standard-4 \
--network-interface=network-tier=PREMIUM,subnet=default \
--maintenance-policy=MIGRATE \
--provisioning-model=STANDARD \
--service-account=$SERVICE_ACCOUNT \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--image=projects/cos-cloud/global/images/cos-stable-109-17800-0-45 \
--boot-disk-size=200GB \
--boot-disk-type=pd-balanced \
--boot-disk-device-name=arco-era5-raw-to-zarr-to-bq \
--container-image=$IMAGE_PATH \
--container-restart-policy=on-failure \
--container-tty \
--no-shielded-secure-boot \
--shielded-vtpm \
--shielded-integrity-monitoring \
--labels=goog-ec-src=vm_add-gcloud,container-vm=cos-stable-109-17800-0-45 \
--metadata-from-file=startup-script=start-up.sh
python deployment/create_job.py
```

8. Once VM is created, the script will execute on `7th day of every month` as this is default set in the [cron-file](data_automate/cron-file).Also you can see the logs after connecting to VM through SSH.
> Log will be shown at this(`/var/log/cron.log`) file.
> Better if we SSH after 5-10 minutes of VM creation.
8. There will be 5 different cloud run jobs.
- `arco-era5-zarr-ingestion` - For zarr data ingestion.
- `arco-era5t-daily-executor` - Triggers daily to process era5t-daily data.
- `arco-era5t-monthly-executor` - Triggers monthly to process era5t-monthly data.
- `arco-era5-sanity` - Sanity job to validate the data era5 vs era5t and replace in case of difference.
- `arco-era5-executor` - Triggers every month to run a sanity job for every zarr available.
### Making the dataset "High Resolution" & beyond...

This phase of the project is under active development! If you would like to lend a hand in any way, please check out our
Expand Down
40 changes: 40 additions & 0 deletions deployment/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

FROM continuumio/miniconda3:latest

# Add the mamba solver for faster builds
RUN conda install -n base conda-libmamba-solver
RUN conda config --set solver libmamba

# Clone the weather-tools and create conda env using environment.yml of weather-tools.
ARG weather_tools_git_rev=main
RUN git clone https://github.com/google/weather-tools.git /weather
WORKDIR /weather
RUN git checkout "${weather_tools_git_rev}"
RUN rm -r /weather/weather_*/test_data/
RUN conda env create -f environment.yml --debug

# Activate the conda env and update the PATH
ARG CONDA_ENV_NAME=weather-tools
RUN echo "source activate ${CONDA_ENV_NAME}" >> ~/.bashrc
ENV PATH /opt/conda/envs/${CONDA_ENV_NAME}/bin:$PATH
RUN pip install -e .

ARG arco_era5_git_rev=era5t-support
RUN git clone https://github.com/google-research/arco-era5.git /arco-era5
WORKDIR /arco-era5
RUN git checkout "${arco_era5_git_rev}"
RUN pip install -e .
111 changes: 111 additions & 0 deletions deployment/create_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.cloud import run_v2
import constants


def create_job(job_id: str, template: run_v2.ExecutionTemplate):
# Create a client
client = run_v2.JobsClient()

# Initialize request argument(s)
job = run_v2.Job()
job.template = template

request = run_v2.CreateJobRequest(
parent=f"projects/{constants.PROJECT}/locations/{constants.REGION}",
job=job,
job_id=job_id,
)

# Make the request
operation = client.create_job(request=request)

print("Waiting for operation to complete...")

response = operation.result()

# Handle the response
print(response)


def graphnn_job_creator(
job_id: str,
timeout_sec: int,
container_args: list = None,
contaner_env: list = None,
container_memory_limit: str = None,
max_retries: int = 0
):
# Create an ExecutionTemplate
template = run_v2.ExecutionTemplate()
template.task_count = constants.TASK_COUNT

template.template.timeout = {"seconds": timeout_sec}
template.template.max_retries = max_retries

# Set container details
container = run_v2.Container()
container.name = constants.CONTAINER_NAME
container.image = constants.CLOUD_RUN_CONTAINER_IMAGE
container.command = constants.CONTAINER_COMMAND
container.args = container_args

# Set environment variables (example)
container.env = contaner_env

# Set resource limits (example)
container.resources.limits = {
"cpu": constants.CONTAINER_CPU_LIMIT,
"memory": container_memory_limit,
}

# Add the container to the template
template.template.containers.append(container)

create_job(job_id, template)


if __name__ == "__main__":
graphnn_job_creator(
job_id=constants.SANITY_JOB_ID,
timeout_sec=constants.TIMEOUT_SECONDS,
container_memory_limit=constants.CONTAINER_MEMORY_LIMIT,
)
graphnn_job_creator(
job_id=constants.INGESTION_JOB_ID,
timeout_sec=constants.TIMEOUT_SECONDS,
container_memory_limit=constants.CONTAINER_MEMORY_LIMIT,
)
graphnn_job_creator(
job_id=constants.DAILY_EXECUTOR_JOB_ID,
timeout_sec=constants.TIMEOUT_SECONDS,
container_args=constants.DAILY_EXECUTOR_JOB_CONTAINER_ARGS,
contaner_env=constants.JOB_CONTAINER_ENV_VARIABLES + constants.ERA5T_API_KEYS,
container_memory_limit=constants.CONTAINER_MEMORY_LIMIT,
)
graphnn_job_creator(
job_id=constants.MONTHLY_EXECUTOR_JOB_ID,
timeout_sec=constants.TIMEOUT_SECONDS,
container_args=constants.MONTHLY_EXECUTOR_JOB_CONTAINER_ARGS,
contaner_env=constants.JOB_CONTAINER_ENV_VARIABLES + constants.ERA5T_API_KEYS,
container_memory_limit=constants.CONTAINER_MEMORY_LIMIT,
)
graphnn_job_creator(
job_id=constants.EXECUTOR_JOB_ID,
timeout_sec=constants.TIMEOUT_SECONDS,
container_args=constants.EXECUTOR_JOB_CONTAINER_ARGS,
contaner_env=constants.JOB_CONTAINER_ENV_VARIABLES + constants.ERA5_API_KEYS,
container_memory_limit=constants.CONTAINER_MEMORY_LIMIT,
)
3 changes: 2 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ name: era5
channels:
- conda-forge
dependencies:
- python=3.9.17
- python=3.8.19
- metview-batch==5.17.0
- pip:
- cython==0.29.34
- setuptools==70.3.0
- metview
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ def run(self):
'immutabledict',
'xarray-beam',
'scipy',
'fsspec==2023.4.0'
'fsspec==2023.4.0',
'google-cloud-run'
],
tests_require=['pytest'],
cmdclass={
Expand Down
9 changes: 6 additions & 3 deletions src/arco_era5/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .constant import variables_full_names, zarr_files
from .constant import ARCO_ERA5_ZARR_FILES, variables_full_names, zarr_files
from .data_availability import check_data_availability
from .ingest_data_in_zarr import ingest_data_in_zarr_dataflow_job
from .download import raw_data_download_dataflow_job, data_splitting_dataflow_job
from .ingest_data_in_zarr import perform_data_operations
from .pangeo import run, parse_args
from .resize_zarr import resize_zarr_target, update_zarr_metadata
from .source_data import (
Expand All @@ -33,6 +34,7 @@
LoadTemporalDataForDateDoFn,
_read_nc_dataset
)
from .sanity import generate_raw_paths, OpenLocal, run_sanity_job, update_zarr
from .update_ar import UpdateSlice as ARUpdateSlice
from .update_co import GenerateOffset, UpdateSlice as COUpdateSlice, generate_input_paths
from .update_config_files import (
Expand All @@ -49,5 +51,6 @@
replace_non_alphanumeric_with_hyphen,
subprocess_run,
convert_to_date,
parse_arguments_raw_to_zarr_to_bq
parse_arguments_raw_to_zarr_to_bq,
ExecTypes
)
9 changes: 9 additions & 0 deletions src/arco_era5/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,12 @@
zarr_files = {'ml_wind': 'gs://gcp-public-data-arco-era5/co/model-level-wind.zarr-v2/',
'ml_moisture': 'gs://gcp-public-data-arco-era5/co/model-level-moisture.zarr-v2/',
'sl_surface': 'gs://gcp-public-data-arco-era5/co/single-level-surface.zarr-v2/'}

ARCO_ERA5_ZARR_FILES = {
"ar": "gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3",
"ml_moisture": "gs://gcp-public-data-arco-era5/co/model-level-moisture.zarr-v2",
"ml_wind": "gs://gcp-public-data-arco-era5/co/model-level-wind.zarr-v2",
"sl_forecast": "gs://gcp-public-data-arco-era5/co/single-level-forecast.zarr-v2",
"sl_reanalysis": "gs://gcp-public-data-arco-era5/co/single-level-reanalysis.zarr-v2",
"sl_surface": "gs://gcp-public-data-arco-era5/co/single-level-surface.zarr-v2"
}
51 changes: 26 additions & 25 deletions src/arco_era5/data_availability.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import datetime
import gcsfs
import logging
import os

import typing as t

Expand All @@ -22,17 +23,14 @@
SINGLE_LEVEL_VARIABLES,
MULTILEVEL_VARIABLES,
PRESSURE_LEVELS_GROUPS,
SINGLE_LEVEL_SUBDIR_TEMPLATE,
MULTILEVEL_SUBDIR_TEMPLATE
)
from .update_co import generate_input_paths
from .utils import ExecTypes

logger = logging.getLogger(__name__)

# File Templates
PRESSURELEVEL_DIR_TEMPLATE = (
"gs://gcp-public-data-arco-era5/raw/date-variable-pressure_level/{year:04d}/{month:02d}/{day:02d}/{chunk}/{pressure}.nc")
AR_SINGLELEVEL_DIR_TEMPLATE = (
"gs://gcp-public-data-arco-era5/raw/date-variable-single_level/{year:04d}/{month:02d}/{day:02d}/{chunk}/surface.nc")

# Data Chunks
MODEL_LEVEL_CHUNKS = ["dve", "tw", "o3q", "qrqs"]
SINGLE_LEVEL_CHUNKS = [
Expand All @@ -52,7 +50,22 @@
PRESSURE_LEVEL = PRESSURE_LEVELS_GROUPS["full_37"]


def check_data_availability(data_date_range: t.List[datetime.datetime]) -> bool:
def generate_input_paths_ar(data_date_range: t.List[datetime.datetime], root_path: str = GCP_DIRECTORY):
paths = []
for date in data_date_range:
for chunk in MULTILEVEL_VARIABLES + SINGLE_LEVEL_VARIABLES:
if chunk in MULTILEVEL_VARIABLES:
for pressure in PRESSURE_LEVEL:
relative_path = MULTILEVEL_SUBDIR_TEMPLATE.format(year=date.year, month=date.month, day=date.day, variable=chunk, pressure_level=pressure)
paths.append(os.path.join(root_path, relative_path))
else:
chunk = 'geopotential' if chunk == 'geopotential_at_surface' else chunk
relative_path = SINGLE_LEVEL_SUBDIR_TEMPLATE.format(year=date.year, month=date.month, day=date.day, variable=chunk)
paths.append(os.path.join(root_path, relative_path))
return paths


def check_data_availability(data_date_range: t.List[datetime.datetime], mode: str, root_path: str) -> bool:
"""Checks the availability of data for a given date range.
Args:
Expand All @@ -65,24 +78,12 @@ def check_data_availability(data_date_range: t.List[datetime.datetime]) -> bool:
fs = gcsfs.GCSFileSystem()
start_date = data_date_range[0].strftime("%Y/%m/%d")
end_date = data_date_range[-1].strftime("%Y/%m/%d")
all_uri = generate_input_paths(start_date, end_date, GCP_DIRECTORY, MODEL_LEVEL_CHUNKS)
all_uri.extend(generate_input_paths(start_date, end_date, GCP_DIRECTORY, SINGLE_LEVEL_CHUNKS, True))

for date in data_date_range:
for chunk in MULTILEVEL_VARIABLES + SINGLE_LEVEL_VARIABLES:
if chunk in MULTILEVEL_VARIABLES:
for pressure in PRESSURE_LEVEL:
all_uri.append(
PRESSURELEVEL_DIR_TEMPLATE.format(year=date.year,
month=date.month,
day=date.day, chunk=chunk,
pressure=pressure))
else:
if chunk == 'geopotential_at_surface':
chunk = 'geopotential'
all_uri.append(
AR_SINGLELEVEL_DIR_TEMPLATE.format(
year=date.year, month=date.month, day=date.day, chunk=chunk))
all_uri = []
if mode == ExecTypes.ERA5T_DAILY.value or mode == ExecTypes.ERA5.value:
all_uri.extend(generate_input_paths(start_date, end_date, root_path, MODEL_LEVEL_CHUNKS))
all_uri.extend(generate_input_paths_ar(data_date_range, root_path))
if mode == ExecTypes.ERA5.value or mode == ExecTypes.ERA5T_MONTHLY.value:
all_uri.extend(generate_input_paths(start_date, end_date, root_path, SINGLE_LEVEL_CHUNKS, True))

data_is_missing = False
for path in all_uri:
Expand Down
Loading

0 comments on commit f200bb9

Please sign in to comment.