diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 0c30f45e..79f4e3c5 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -9,70 +9,41 @@ on: branches: [ "main" ] paths-ignore: - 'docs/**' + schedule: + - cron: '0 4 * * *' # run once a day at 4 AM env: PYTEST_ADDOPTS: "--color=yes" -# This our primary testing workflow. -# -# It also serves as an example for how to reference the separate prepare-env workflow. -# This workflow does not even attempt to build environments. It dispatches that resposibility -# to the prepare-env workflow and then loads the environments which that workflow has cached. -# -# Note that all of the steps listed under the comment ``# generic steps to load env from cache`` -# are required to load the environment. They include: -# - Generating the cache key -# - Actually loading the environment using the key -# - Adding the path ``/usr/share/miniconda3/envs/pangeo-forge-recipes/bin`` to $PATH - jobs: - prepare-env: - uses: ./.github/workflows/prepare-env.yaml run-tests: - needs: prepare-env runs-on: ubuntu-latest strategy: fail-fast: false matrix: python-version: ["3.9", "3.10", "3.11"] - dependencies: ["releases-only", "upstream-dev"] steps: - - uses: actions/checkout@v3 - - # generic steps to load env from cache - - name: 🎯 Set cache number - id: cache-number - # cache will last 3 days by default - run: echo CACHE_NUMBER=`expr $(date +'%j') / 3` >> $GITHUB_ENV - - name: 🎯 Set environment file - id: env-file - run: echo "env_file=ci/py${{ matrix.python-version }}.yml" >> $GITHUB_ENV - - uses: actions/cache@v3 - name: 🗃 Loaded Cached environment + - uses: actions/checkout@v4 + - name: 🔁 Setup Python + id: setup-python + uses: actions/setup-python@v4 with: - path: /usr/share/miniconda3/envs/pangeo-forge-recipes - key: ${{ runner.os }}-conda-${{ matrix.python-version }}-${{ hashFiles( env.env_file ) }}-${{ matrix.dependencies }}-${{ env.CACHE_NUMBER }} - id: conda-cache - - name: 🤿 Bail out if no cache hit - if: steps.conda-cache.outputs.cache-hit != 'true' - run: false - - name: 🎯 Set path to include conda python - run: echo "/usr/share/miniconda3/envs/pangeo-forge-recipes/bin" >> $GITHUB_PATH - - # custom testing steps unique to this workflow + python-version: ${{ matrix.python-version }} + cache: pip + cache-dependency-path: pyproject.toml + - name: 🎯 Check cache hit + run: echo '${{ steps.setup-python.outputs.cache-hit }}' - name: 🌈 Install pangeo-forge-recipes package shell: bash -l {0} run: | - python -m pip install --no-deps -e . - - # workaround for https://github.com/conda-forge/apache-beam-feedstock/issues/67 - # this can be removed once that bug is fixed - - name: 🔧 Workaround broken beam conda release with pip install + python -m pip install -e ".[dev]" + - name: 🧑‍💻 On the nightly run, test upstream dev versions + if: | + github.event_name == 'schedule' shell: bash -l {0} run: | - python -m pip uninstall apache-beam -y - python -m pip install apache-beam - + python -m pip install -Ur ci/requirements-upstream-dev.txt + python -m pip install -U --pre apache-beam - name: 🏄‍♂️ Run Tests shell: bash -l {0} run: | @@ -82,6 +53,9 @@ jobs: --cov-report xml \ --durations=10 --durations-min=1.0 - name: 🚦 Run Codecov + if: | + github.event_name == 'push' || + github.event_name == 'pull_request' uses: codecov/codecov-action@v3.1.4 with: file: ./coverage.xml diff --git a/.github/workflows/prepare-env.yaml b/.github/workflows/prepare-env.yaml deleted file mode 100644 index be7367db..00000000 --- a/.github/workflows/prepare-env.yaml +++ /dev/null @@ -1,81 +0,0 @@ -name: Prepare env - -on: - workflow_call: - -# ** What is this this workflow? ** -# -# This resuable environment building and caching ("prepare-env") workflow adds some complexity -# to our CI stack, in exchange for optimizing for speed. Other workflows which rely on it can -# load cached environments without needing to build them from scratch. -# -# ** How do I use it? ** -# -# To load a cached environment built by this workflow in another workflow, please refer to -# main.yaml in this directory, which demonstrates its proper use. -# -# ** Details ** -# -# This step builds and caches the test environments using conda. -# If there is no cache, building each environment takes 3-4 minutes. -# If there is a cache this step should take about 40 seconds. -# (The time needed to install conda and check the cache.) -# -# The cache key is as follows: -# ${{ runner.os }}-conda-${{ matrix.python-version }}-${{ hashFiles( env.env_file ) }}-${{ env.CACHE_NUMBER }} -# -# A key parameter env.CACHE_NUMBER. This is a number that is set based on the current date: -# CACHE_NUMBER=`expr $(date +'%j') / 3` -# This is designed such that the cached environment will be reused for maximum 3 days. -# After 3 days, the CACHE_NUMBER will increment, forcing a new environment to be built. -# This cadence was chosen as a balance between making the tests go fast -# (avoiding rebuilding environments every run), and using a fresh, recent environment. -# -# The test environment is cached in /usr/share/miniconda3/envs/pangeo-forge-recipes - -jobs: - prepare-env: - runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - python-version: ["3.9", "3.10", "3.11"] - dependencies: ["releases-only", "upstream-dev"] - steps: - - uses: actions/checkout@v3 - - name: 🔁 Setup Python - uses: actions/setup-python@v4 - with: - python-version: ${{ matrix.python-version }} - architecture: x64 - - name: 🔁 Setup Miniconda - uses: conda-incubator/setup-miniconda@v2 - with: - miniforge-variant: Mambaforge - miniforge-version: latest - activate-environment: pangeo-forge-recipes - use-mamba: true - - name: 🎯 Set cache number - id: cache-number - # cache will last 3 days by default - run: echo CACHE_NUMBER=`expr $(date +'%j') / 3` >> $GITHUB_ENV - - name: 🎯 Set environment file - id: env-file - run: echo "env_file=ci/py${{ matrix.python-version }}.yml" >> $GITHUB_ENV - - uses: actions/cache@v3 - name: 🗃 Cache environment - with: - path: /usr/share/miniconda3/envs/pangeo-forge-recipes - key: ${{ runner.os }}-conda-${{ matrix.python-version }}-${{ hashFiles( env.env_file ) }}-${{ matrix.dependencies }}-${{ env.CACHE_NUMBER }} - id: conda-cache - - name: 🐫 Maybe Update environment - if: steps.conda-cache.outputs.cache-hit != 'true' - run: mamba env update -n pangeo-forge-recipes -f ${{ env.env_file }} - - name: 🧑‍💻 Maybe update to upstream dev versions - if: matrix.dependencies == 'upstream-dev' - run: mamba env update -n pangeo-forge-recipes -f ci/upstream-dev.yml - - name: 🐍 List conda env - shell: bash -l {0} - run: | - conda info - conda list diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index a8d6faa7..da4c2781 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -9,7 +9,7 @@ jobs: deploy: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v4 with: diff --git a/.github/workflows/slash-command-dispatch.yaml b/.github/workflows/slash-command-dispatch.yaml deleted file mode 100644 index 1992a7d9..00000000 --- a/.github/workflows/slash-command-dispatch.yaml +++ /dev/null @@ -1,16 +0,0 @@ -name: Slash Command Dispatch -on: - issue_comment: - types: [created] -jobs: - slash-command-dispatch: - runs-on: ubuntu-latest - steps: - - name: Slash Command Dispatch - uses: peter-evans/slash-command-dispatch@v3 - with: - token: ${{ secrets.ACTIONS_BOT_TOKEN }} - commands: run-test-tutorials - # anyone can trigger this command - permission: none - issue-type: pull-request diff --git a/.github/workflows/test-integration.yaml b/.github/workflows/test-integration.yaml index b87ee706..325a4b5c 100644 --- a/.github/workflows/test-integration.yaml +++ b/.github/workflows/test-integration.yaml @@ -11,7 +11,7 @@ env: PYTEST_ADDOPTS: "--color=yes" jobs: - prepare-env: + integration-tests: # run on: # - all pushes to specified branch(es) # - a PR was just labeled 'test-integration' @@ -20,44 +20,37 @@ jobs: github.event_name == 'push' || github.event.label.name == 'test-integration' || contains( github.event.pull_request.labels.*.name, 'test-integration') - uses: ./.github/workflows/prepare-env.yaml - integration-tests: - needs: prepare-env runs-on: ubuntu-latest strategy: fail-fast: false matrix: - python-version: ["3.9", "3.10", "3.11"] - dependencies: ["releases-only", "upstream-dev"] + # the runner versions tested here are pinned to an older version of apache-beam which is not + # necessarily expected to work on python > 3.9. For one additional context point, see: + # https://github.com/pangeo-forge/pangeo-forge-recipes/issues/540#issuecomment-1685096271 + # Once https://github.com/pangeo-forge/pangeo-forge-runner/pull/90 goes in, we can add back + # integration testing for 3.10 and 3.11 (for runner versions that follow that PR). + python-version: ["3.9"] # , "3.10", "3.11"] + runner-version: [ + "pangeo-forge-runner==0.8.0", + "git+https://github.com/pangeo-forge/pangeo-forge-runner.git@injections#egg=pangeo_forge_runner", + ] steps: - - uses: actions/checkout@v3 - - # generic steps to load env from cache - - name: 🎯 Set cache number - id: cache-number - # cache will last 3 days by default - run: echo CACHE_NUMBER=`expr $(date +'%j') / 3` >> $GITHUB_ENV - - name: 🎯 Set environment file - id: env-file - run: echo "env_file=ci/py${{ matrix.python-version }}.yml" >> $GITHUB_ENV - - uses: actions/cache@v3 - name: 🗃 Loaded Cached environment + - uses: actions/checkout@v4 + - name: 🔁 Setup Python + id: setup-python + uses: actions/setup-python@v4 with: - path: /usr/share/miniconda3/envs/pangeo-forge-recipes - key: ${{ runner.os }}-conda-${{ matrix.python-version }}-${{ hashFiles( env.env_file ) }}-${{ matrix.dependencies }}-${{ env.CACHE_NUMBER }} - id: conda-cache - - name: 🤿 Bail out if no cache hit - if: steps.conda-cache.outputs.cache-hit != 'true' - run: false - - name: 🎯 Set path to include conda python - run: echo "/usr/share/miniconda3/envs/pangeo-forge-recipes/bin" >> $GITHUB_PATH - - # custom testing steps unique to this workflow - - name: 🌈 Install pangeo-forge-recipes package + python-version: ${{ matrix.python-version }} + cache: pip + cache-dependency-path: pyproject.toml + - name: 🎯 Check cache hit + run: echo '${{ steps.setup-python.outputs.cache-hit }}' + - name: 🌈 Install pangeo-forge-recipes & pangeo-forge-runner shell: bash -l {0} run: | - python -m pip install --no-deps -e . + python -m pip install -e ".[dev]" + python -m pip install ${{ matrix.runner-version }} - name: 🏄‍♂️ Run Tests shell: bash -l {0} run: | - pytest --timeout=600 tests-integration/ -v + pytest --timeout=600 -vvxs tests/test_integration.py --run-integration diff --git a/.github/workflows/tutorials.yaml b/.github/workflows/tutorials.yaml deleted file mode 100644 index 6ad07054..00000000 --- a/.github/workflows/tutorials.yaml +++ /dev/null @@ -1,111 +0,0 @@ -name: Test Tutorial Notebooks - -on: - push: - branches: main - repository_dispatch: - types: [run-test-tutorials-command] - workflow_dispatch: - -env: - PYTEST_ADDOPTS: "--color=yes" - python-version: "3.9" - -jobs: - test-tutuorials: - name: 📓 Build Tutorial Notebook - runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - include: - - nb-path: "hdf_reference/reference_cmip6.ipynb" - - nb-path: "xarray_zarr/cmip6-recipe.ipynb" - - nb-path: "xarray_zarr/multi_variable_recipe.ipynb" - - nb-path: "xarray_zarr/netcdf_zarr_sequential.ipynb" - - nb-path: "xarray_zarr/opendap_subset_recipe.ipynb" - - nb-path: "xarray_zarr/terraclimate.ipynb" - steps: - - uses: actions/checkout@v3 - if: ${{ github.event_name != 'repository_dispatch' }} - - uses: actions/checkout@v3 - if: ${{ github.event_name == 'repository_dispatch' }} - with: - fetch-depth: 0 - token: ${{ secrets.ACTIONS_BOT_TOKEN }} - repository: ${{ github.event.client_payload.pull_request.head.repo.full_name }} - ref: ${{ github.event.client_payload.pull_request.head.ref }} - - # start a check - - name: ✅ Register Check Started - id: start-check - uses: LouisBrunner/checks-action@v1.6.2 - if: ${{ github.event_name == 'repository_dispatch' }} - with: - sha: ${{ github.event.client_payload.pull_request.head.sha }} - # can't use default token - # https://github.community/t/create-a-check-run-details-url-is-not-being-set/166002/4?u=bkwhite - # https://github.com/LouisBrunner/checks-action/issues/18#issuecomment-970312052 - token: ${{ secrets.GITHUB_TOKEN }} - name: Test Notebook ${{ matrix.nb-path }} - status: in_progress - # this seems to be broken - details_url: https://github.com/${{github.repository}}/actions/runs/${{github.run_id}} - - - name: 🔁 Setup Python - uses: actions/setup-python@v4 - with: - python-version: ${{ env.python-version }} - architecture: x64 - - name: 🔁 Setup Miniconda - uses: conda-incubator/setup-miniconda@v2 - with: - miniforge-variant: Mambaforge - miniforge-version: latest - activate-environment: pangeo-forge-recipes - use-mamba: true - - name: 🎯 Set cache number - id: cache-number - # cache will last 3 days by default - run: echo CACHE_NUMBER=`expr $(date +'%j') / 3` >> $GITHUB_ENV - - name: 🎯 Set environment file - id: env-file - run: echo "env_file=ci/py${{ env.python-version }}.yml" >> $GITHUB_ENV - - uses: actions/cache@v3 - name: 🗃 Cache environment - with: - path: /usr/share/miniconda3/envs/pangeo-forge-recipes - key: ${{ runner.os }}-conda-${{ env.python-version }}-${{ hashFiles( env.env_file ) }}-${{ env.CACHE_NUMBER }} - id: conda-cache - - name: 🐫 Update environment - run: mamba env update -n pangeo-forge-recipes -f ${{ env.env_file }} - if: steps.conda-cache.outputs.cache-hit != 'true' - - name: 🐍 List conda env - shell: bash -l {0} - run: | - conda info - conda list - - name: 🌈 Install pangeo-forge-recipes package - shell: bash -l {0} - run: | - python -m pip install --no-deps -e . - - # Everything above here is basically the same as main.yaml - # Could we consolidate it somehow? - - - name: 🏗 Build Notebook - shell: bash -l {0} - run: > - pytest --timeout=3000 --nbmake docs/pangeo_forge_recipes/tutorials/${{ matrix.nb-path }} - - # finish the check - # this won't work if we are not inside a repository_dispatch event, - # but I don't see how to combine multiple if statements - - uses: LouisBrunner/checks-action@v1.6.2 - if: ${{ always() && steps.start-check.outputs.check_id }} - with: - sha: ${{ github.event.client_payload.pull_request.head.sha }} - token: ${{ secrets.GITHUB_TOKEN }} - check_id: ${{ steps.start-check.outputs.check_id }} - status: completed - conclusion: ${{ job.status }} diff --git a/ci/requirements-upstream-dev.txt b/ci/requirements-upstream-dev.txt new file mode 100644 index 00000000..e8cd2421 --- /dev/null +++ b/ci/requirements-upstream-dev.txt @@ -0,0 +1,4 @@ +git+https://github.com/fsspec/filesystem_spec.git +git+https://github.com/pydata/xarray.git +git+https://github.com/fsspec/kerchunk.git +git+https://github.com/zarr-developers/zarr-python.git diff --git a/ci/upstream-dev.yml b/ci/upstream-dev.yml deleted file mode 100644 index b45977fa..00000000 --- a/ci/upstream-dev.yml +++ /dev/null @@ -1,9 +0,0 @@ -name: pangeo-forge-recipes - -dependencies: - - pip - - pip: - - "git+https://github.com/dask/dask.git" - - "git+https://github.com/fsspec/filesystem_spec.git" - - "git+https://github.com/pydata/xarray.git" - - "git+https://github.com/fsspec/kerchunk.git" diff --git a/examples/feedstock/gpcp_from_gcs.py b/examples/feedstock/gpcp_from_gcs.py new file mode 100644 index 00000000..caea9a11 --- /dev/null +++ b/examples/feedstock/gpcp_from_gcs.py @@ -0,0 +1,44 @@ +import apache_beam as beam +import pandas as pd +import zarr + +from pangeo_forge_recipes.patterns import ConcatDim, FilePattern +from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr + +dates = [ + d.to_pydatetime().strftime("%Y%m%d") + for d in pd.date_range("1996-10-01", "1999-02-01", freq="D") +] + + +def make_url(time): + url_base = "https://storage.googleapis.com/pforge-test-data" + return f"{url_base}/gpcp/v01r03_daily_d{time}.nc" + + +concat_dim = ConcatDim("time", dates, nitems_per_file=1) +pattern = FilePattern(make_url, concat_dim) + + +def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore: + # This fails integration test if not imported here + # TODO: see if --setup-file option for runner fixes this + import xarray as xr + + ds = xr.open_dataset(store, engine="zarr", chunks={}) + assert ds.title == ( + "Global Precipitation Climatatology Project (GPCP) " "Climate Data Record (CDR), Daily V1.3" + ) + return store + + +recipe = ( + beam.Create(pattern.items()) + | OpenURLWithFSSpec() + | OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"}) + | StoreToZarr( + store_name="gpcp.zarr", + combine_dims=pattern.combine_dim_keys, + ) + | "Test dataset" >> beam.Map(test_ds) +) diff --git a/examples/feedstock/hrrr_kerchunk_concat_step.py b/examples/feedstock/hrrr_kerchunk_concat_step.py new file mode 100644 index 00000000..dc1b0dab --- /dev/null +++ b/examples/feedstock/hrrr_kerchunk_concat_step.py @@ -0,0 +1,63 @@ +"""Integration test for Pangeo Forge pipeline which creates a combined Kerchunk dataset from +HRRR data. Based on prior discussion and examples provided in: + - https://github.com/pangeo-forge/pangeo-forge-recipes/issues/387#issuecomment-1193514343 + - https://gist.github.com/darothen/5380e223ae5bc894006a5b6ed5a27cbb +""" + +import apache_beam as beam +import zarr + +from pangeo_forge_recipes.patterns import ConcatDim, FilePattern +from pangeo_forge_recipes.transforms import ( + CombineReferences, + OpenWithKerchunk, + WriteCombinedReference, +) + +remote_protocol = "s3" +storage_options = {"anon": True} + + +def format_function(step: int) -> str: + url_template = "s3://noaa-hrrr-bdp-pds/hrrr.20220722/conus/hrrr.t22z.wrfsfcf{step:02d}.grib2" + return url_template.format(step=step) + + +pattern = FilePattern(format_function, ConcatDim("step", [0, 1, 2, 3]), file_type="grib") + +identical_dims = ["time", "surface", "latitude", "longitude", "y", "x"] +grib_filters = {"typeOfLevel": "surface", "shortName": "t"} + + +def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore: + import xarray as xr + + ds = xr.open_dataset(store, engine="zarr", chunks={}) + ds = ds.set_coords(("latitude", "longitude")) + assert ds.attrs["centre"] == "kwbc" + assert len(ds["step"]) == 4 + assert len(ds["time"]) == 1 + assert "t" in ds.data_vars + for coord in ["time", "surface", "latitude", "longitude"]: + assert coord in ds.coords + return store + + +recipe = ( + beam.Create(pattern.items()) + | OpenWithKerchunk( + file_type=pattern.file_type, + remote_protocol=remote_protocol, + storage_options=storage_options, + kerchunk_open_kwargs={"filter": grib_filters}, + ) + | CombineReferences( + concat_dims=pattern.concat_dims, + identical_dims=identical_dims, + precombine_inputs=True, + ) + | WriteCombinedReference( + store_name="hrrr-concat-step", + ) + | "Test dataset" >> beam.Map(test_ds) +) diff --git a/examples/feedstock/hrrr_kerchunk_concat_valid_time.py b/examples/feedstock/hrrr_kerchunk_concat_valid_time.py new file mode 100644 index 00000000..b1be4328 --- /dev/null +++ b/examples/feedstock/hrrr_kerchunk_concat_valid_time.py @@ -0,0 +1,77 @@ +"""Integration test that replicates the results of the following tutorial: + +https://projectpythia.org/kerchunk-cookbook/notebooks/case_studies/HRRR.html +""" +from typing import Any + +import apache_beam as beam +import fsspec +import s3fs +import zarr + +from pangeo_forge_recipes.patterns import FilePattern, pattern_from_file_sequence +from pangeo_forge_recipes.transforms import ( + CombineReferences, + OpenWithKerchunk, + WriteCombinedReference, +) + +storage_options = {"anon": True} +remote_protocol = "s3" +concat_dims = ["valid_time"] +identical_dims = ["latitude", "longitude", "heightAboveGround", "step"] +grib_filter = {"typeOfLevel": "heightAboveGround", "level": [2, 10]} + + +def drop_unknown(refs: dict[str, Any]): + for k in list(refs): + if k.startswith("unknown"): + refs.pop(k) + return refs + + +fs_read: s3fs.S3FileSystem = fsspec.filesystem( + remote_protocol, + skip_instance_cache=True, + **storage_options, +) +days_available = fs_read.glob("s3://noaa-hrrr-bdp-pds/hrrr.*") +files = fs_read.glob(f"s3://{days_available[-1]}/conus/*wrfsfcf01.grib2") +files = sorted(["s3://" + f for f in files]) +files = files[0:2] + +pattern: FilePattern = pattern_from_file_sequence( + files, + concat_dim=concat_dims[0], + file_type="grib", +) + + +def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore: + import xarray as xr + + ds = xr.open_dataset(store, engine="zarr", chunks={}) + # TODO: more detailed asserts + assert "t2m" in ds.data_vars + + +recipe = ( + beam.Create(pattern.items()) + | OpenWithKerchunk( + file_type=pattern.file_type, + inline_threshold=100, + remote_protocol=remote_protocol, + storage_options=storage_options, + kerchunk_open_kwargs={"filter": grib_filter}, + ) + | CombineReferences( + concat_dims=concat_dims, + identical_dims=identical_dims, + mzz_kwargs=dict(preprocess=drop_unknown), + precombine_inputs=True, + ) + | WriteCombinedReference( + store_name="hrrr-concat-valid-time", + ) + | "Test dataset" >> beam.Map(test_ds) +) diff --git a/examples/feedstock/meta.yaml b/examples/feedstock/meta.yaml new file mode 100644 index 00000000..cb9f2943 --- /dev/null +++ b/examples/feedstock/meta.yaml @@ -0,0 +1,7 @@ +recipes: + - id: "gpcp-from-gcs" + object: "gpcp_from_gcs:recipe" + - id: "noaa-oisst" + object: "noaa_oisst:recipe" + - id: "terraclimate" + object: "terraclimate:recipe" diff --git a/examples/feedstock/narr_opendap.py b/examples/feedstock/narr_opendap.py new file mode 100644 index 00000000..7d0a499c --- /dev/null +++ b/examples/feedstock/narr_opendap.py @@ -0,0 +1,72 @@ +"""NARR: Subsetting and OPeNDAP + +This tutorial uses data from NOAA's North American Regional Reanalysis (NARR). From +https://www.ncei.noaa.gov/products/weather-climate-models/north-american-regional: + + The North American Regional Reanalysis (NARR) is a model produced by the National Centers + for Environmental Prediction (NCEP) that generates reanalyzed data for temperature, wind, + moisture, soil, and dozens of other parameters. The NARR model assimilates a large amount + of observational data from a variety of sources to produce a long-term picture of weather + over North America. + +For this recipe, we will access the data via OPeNDAP +(https://earthdata.nasa.gov/collaborate/open-data-services-and-software/api/opendap), a widely-used +API for remote access of environmental data over HTTP. A key point is that, since we use using +OPeNDAP, _there are no input files to download / cache_. We open the data directly from the remote +server. + +The data we will use are catalogged here (3D data on pressure levels): +https://psl.noaa.gov/thredds/catalog/Datasets/NARR/pressure/catalog.html +""" +import apache_beam as beam +import xarray as xr +import zarr + +from pangeo_forge_recipes.patterns import ConcatDim, FilePattern +from pangeo_forge_recipes.transforms import Indexed, OpenWithXarray, StoreToZarr + +time_dim = ConcatDim("time", ["197901"]) + + +def format_function(time): + return f"https://psl.noaa.gov/thredds/dodsC/Datasets/NARR/pressure/air.{time}.nc" + + +pattern = FilePattern(format_function, time_dim, file_type="opendap") + + +class SetProjectionAsCoord(beam.PTransform): + """A preprocessing function which will assign the `Lambert_Conformal + variable as a coordinate variable. + """ + + @staticmethod + def _set_projection_as_coord(item: Indexed[xr.Dataset]) -> Indexed[xr.Dataset]: + index, ds = item + ds = ds.set_coords(["Lambert_Conformal"]) + return index, ds + + def expand(self, pcoll: beam.PCollection) -> beam.PCollection: + return pcoll | beam.Map(self._set_projection_as_coord) + + +def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore: + # This fails integration test if not imported here + # TODO: see if --setup-file option for runner fixes this + import xarray as xr + + ds = xr.open_dataset(store, engine="zarr", chunks={}) + assert "air" in ds.data_vars + + +recipe = ( + beam.Create(pattern.items()) + | OpenWithXarray(file_type=pattern.file_type) + | SetProjectionAsCoord() + | StoreToZarr( + store_name="narr.zarr", + combine_dims=pattern.combine_dim_keys, + target_chunks={"time": 1}, + ) + | "Test dataset" >> beam.Map(test_ds) +) diff --git a/examples/feedstock/noaa_oisst.py b/examples/feedstock/noaa_oisst.py new file mode 100644 index 00000000..7efa8de6 --- /dev/null +++ b/examples/feedstock/noaa_oisst.py @@ -0,0 +1,44 @@ +import apache_beam as beam +import pandas as pd +import zarr + +from pangeo_forge_recipes.patterns import ConcatDim, FilePattern +from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr + +dates = pd.date_range("1981-09-01", "2022-02-01", freq="D") + +URL_FORMAT = ( + "https://www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/" + "v2.1/access/avhrr/{time:%Y%m}/oisst-avhrr-v02r01.{time:%Y%m%d}.nc" +) + + +def make_url(time): + return URL_FORMAT.format(time=time) + + +time_concat_dim = ConcatDim("time", dates, nitems_per_file=1) +pattern = FilePattern(make_url, time_concat_dim) + + +def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore: + # This fails integration test if not imported here + # TODO: see if --setup-file option for runner fixes this + import xarray as xr + + ds = xr.open_dataset(store, engine="zarr", chunks={}) + for var in ["anom", "err", "ice", "sst"]: + assert var in ds.data_vars + return store + + +recipe = ( + beam.Create(pattern.items()) + | OpenURLWithFSSpec() + | OpenWithXarray(file_type=pattern.file_type) + | StoreToZarr( + store_name="noaa-oisst.zarr", + combine_dims=pattern.combine_dim_keys, + ) + | "Test dataset" >> beam.Map(test_ds) +) diff --git a/examples/feedstock/terraclimate.py b/examples/feedstock/terraclimate.py new file mode 100644 index 00000000..197ef1a6 --- /dev/null +++ b/examples/feedstock/terraclimate.py @@ -0,0 +1,239 @@ +"""From http://www.climatologylab.org/terraclimate.html: + + TerraClimate is a dataset of monthly climate and climatic water balance for global terrestrial + surfaces from 1958-2019. These data provide important inputs for ecological and hydrological + studies at global scales that require high spatial resolution and time-varying data. All data + have monthly temporal resolution and a ~4-km (1/24th degree) spatial resolution. The data cover + the period from 1958-2019. We plan to update these data periodically (annually). + +This is an advanced example that illustrates the following concepts +- **Multiple variables in different files**: One file per year for a dozen different variables. +- **Complex preprocessing**: We want to apply different preprocessing depending on the variable. +""" +import apache_beam as beam +import xarray as xr +import zarr + +from pangeo_forge_recipes.patterns import ConcatDim, FilePattern, MergeDim +from pangeo_forge_recipes.transforms import Indexed, OpenURLWithFSSpec, OpenWithXarray, StoreToZarr + +# for the example, we only select two years to keep the example small; +# this time range can be extended if you are running the recipe yourself. +years = list(range(2000, 2002)) + +# even when subsetting to just two years of data, including every variable results +# in a dataset size of rougly 3-3.5 GB. this is a bit large to run for the example. +# to keep the example efficient, we select two of the available variables. to run +# more variables yourself, simply uncomment any/all of the commented variables below. +variables = [ + # "aet", + # "def", + # "pet", + # "ppt", + # "q", + "soil", + "srad", + # "swe", + # "tmax", + # "tmin", + # "vap", + # "ws", + # "vpd", + # "PDSI", +] + + +def make_filename(variable, time): + return ( + "http://thredds.northwestknowledge.net:8080/thredds/fileServer/" + f"TERRACLIMATE_ALL/data/TerraClimate_{variable}_{time}.nc" + ) + + +pattern = FilePattern( + make_filename, ConcatDim(name="time", keys=years), MergeDim(name="variable", keys=variables) +) + + +class Munge(beam.PTransform): + """ + Apply cleaning transformations to Datasets + """ + + @staticmethod + def _apply_mask(key, da): + """helper function to mask DataArrays based on a threshold value""" + mask_opts = { + "PDSI": ("lt", 10), + "aet": ("lt", 32767), + "def": ("lt", 32767), + "pet": ("lt", 32767), + "ppt": ("lt", 32767), + "ppt_station_influence": None, + "q": ("lt", 2147483647), + "soil": ("lt", 32767), + "srad": ("lt", 32767), + "swe": ("lt", 10000), + "tmax": ("lt", 200), + "tmax_station_influence": None, + "tmin": ("lt", 200), + "tmin_station_influence": None, + "vap": ("lt", 300), + "vap_station_influence": None, + "vpd": ("lt", 300), + "ws": ("lt", 200), + } + if mask_opts.get(key, None): + op, val = mask_opts[key] + if op == "lt": + da = da.where(da < val) + elif op == "neq": + da = da.where(da != val) + return da + + def _preproc(self, item: Indexed[xr.Dataset]) -> Indexed[xr.Dataset]: + """custom preprocessing function for terraclimate data""" + import xarray as xr + + index, ds = item + + # invalid unicode in source data. This attr replacement is a fix. + # FIXME: use lighter solution from: + # https://github.com/pangeo-forge/pangeo-forge-recipes/issues/586 + fixed_attrs = { + "method": ( + "These layers from TerraClimate were derived from the essential climate variables " + "of TerraClimate. Water balance variables, actual evapotranspiration, climatic " + "water deficit, runoff, soil moisture, and snow water equivalent were calculated " + "using a water balance model and plant extractable soil water capacity derived " + "from Wang-Erlandsson et al (2016)." + ), + "title": ( + "TerraClimate: monthly climate and climatic water balance for global land surfaces" + ), + "summary": ( + "This archive contains a dataset of high-spatial resolution (1/24th degree, ~4-km) " + "monthly climate and climatic water balance for global terrestrial surfaces from " + "1958-2015. These data were created by using climatically aided interpolation, " + "combining high-spatial resolution climatological normals from the WorldClim " + "version 1.4 and version 2 datasets, with coarser resolution time varying " + "(i.e. monthly) data from CRU Ts4.0 and JRA-55 to produce a monthly dataset of " + "precipitation, maximum and minimum temperature, wind speed, vapor pressure, and " + "solar radiation. TerraClimate additionally produces monthly surface water balance " + "datasets using a water balance model that incorporates reference " + "evapotranspiration, precipitation, temperature, and interpolated plant " + "extractable soil water capacity." + ), + "keywords": ( + "WORLDCLIM,global,monthly, temperature,precipitation,wind,radiation,vapor " + "pressure, evapotranspiration,water balance,soil water capacity,snow water " + "equivalent,runoff" + ), + "id": "Blank", + "naming_authority": "edu.uidaho.nkn", + "keywords_vocabulary": "None", + "cdm_data_type": "GRID", + "history": "Created by John Abatzoglou, University of California Merced", + "date_created": "2021-04-22", + "creator_name": "John Abatzoglou", + "creator_url": "http://climate.nkn.uidaho.edu/TerraClimate", + "creator_role": "Principal Investigator", + "creator_email": "jabatzoglou@ucmerced.edu", + "institution": "University of California Merced", + "project": "Global Dataset of Monthly Climate and Climatic Water Balance (1958-2015)", + "processing_level": "Gridded Climate Projections", + "acknowledgment": ( + "Please cite the references included herein. We also acknowledge the WorldClim " + "datasets (Fick and Hijmans, 2017; Hijmans et al., 2005) and the CRU Ts4.0 " + "(Harris et al., 2014) and JRA-55 (Kobayashi et al., 2015) datasets." + ), + "geospatial_lat_min": -89.979164, + "geospatial_lat_max": 89.979164, + "geospatial_lon_min": -179.97917, + "geospatial_lon_max": 179.97917, + "geospatial_vertical_min": 0.0, + "geospatial_vertical_max": 0.0, + "time_coverage_start": "1958-01-01T00:0", + "time_coverage_end": "1958-12-01T00:0", + "time_coverage_duration": "P1Y", + "time_coverage_resolution": "P1M", + "standard_nam_vocabulary": "CF-1.0", + "license": "No restrictions", + "contributor_name": "Katherine Hegewisch", + "contributor_role": "Postdoctoral Fellow", + "contributor_email": "khegewisch@ucmerced.edu", + "publisher_name": "Northwest Knowledge Network", + "publisher_url": "http://www.northwestknowledge.net", + "publisher_email": "info@northwestknowledge.net", + "date_modified": "2021-04-22", + "date_issued": "2021-04-22", + "geospatial_lat_units": "decimal degrees north", + "geospatial_lat_resolution": -0.041666668, + "geospatial_lon_units": "decimal degrees east", + "geospatial_lon_resolution": 0.041666668, + "geospatial_vertical_units": "None", + "geospatial_vertical_resolution": 0.0, + "geospatial_vertical_positive": "Up", + "references": ( + "Abatzoglou, J.T., S.Z. Dobrowski, S.A. Parks, and K.C. Hegewisch, 2017, " + "High-resolution global dataset of monthly climate and climatic water " + "balance from 1958-2015, submitted to Scientific Data." + ), + "source": "WorldClim v2.0 (2.5m), CRU Ts4.0, JRA-55", + "version": "v1.0", + "Conventions": "CF-1.6", + } + ds.attrs = fixed_attrs + + rename = {} + + station_influence = ds.get("station_influence", None) + + if station_influence is not None: + ds = ds.drop_vars("station_influence") + + var = list(ds.data_vars)[0] + + rename_vars = {"PDSI": "pdsi"} + + if var in rename_vars: + rename[var] = rename_vars[var] + + if "day" in ds.coords: + rename["day"] = "time" + + if station_influence is not None: + ds[f"{var}_station_influence"] = station_influence + with xr.set_options(keep_attrs=True): + ds[var] = self._apply_mask(var, ds[var]) + if rename: + ds = ds.rename(rename) + return index, ds + + def expand(self, pcoll: beam.PCollection) -> beam.PCollection: + return pcoll | beam.Map(self._preproc) + + +def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore: + # This fails integration test if not imported here + # TODO: see if --setup-file option for runner fixes this + import xarray as xr + + ds = xr.open_dataset(store, engine="zarr", chunks={}) + assert "soil" in ds.data_vars + assert "srad" in ds.data_vars + assert len(ds.time) == 24 + + +recipe = ( + beam.Create(pattern.items()) + | OpenURLWithFSSpec() + | OpenWithXarray(file_type=pattern.file_type) + | Munge() # Custom pre-processor + | StoreToZarr( + store_name="terraclimate.zarr", + combine_dims=pattern.combine_dim_keys, + target_chunks={"lat": 1024, "lon": 1024, "time": 12}, + ) + | "Test dataset" >> beam.Map(test_ds) +) diff --git a/examples/runner-commands/bake.sh b/examples/runner-commands/bake.sh new file mode 100644 index 00000000..14bf24d4 --- /dev/null +++ b/examples/runner-commands/bake.sh @@ -0,0 +1 @@ +pangeo-forge-runner bake --repo=$REPO -f=$CONFIG_FILE --Bake.recipe_id=$RECIPE_ID --Bake.job_name=$JOB_NAME --prune diff --git a/examples/runner-config/local.json b/examples/runner-config/local.json new file mode 100644 index 00000000..a8a36fa5 --- /dev/null +++ b/examples/runner-config/local.json @@ -0,0 +1,13 @@ +{ + "Bake": { + "bakery_class": "pangeo_forge_runner.bakery.local.LocalDirectBakery" + }, + "TargetStorage": { + "fsspec_class": "fsspec.implementations.local.LocalFileSystem", + "root_path": "./target" + }, + "InputCacheStorage": { + "fsspec_class": "fsspec.implementations.local.LocalFileSystem", + "root_path": "./cache" + } +} diff --git a/examples/runner-config/local.py b/examples/runner-config/local.py new file mode 100644 index 00000000..18d2f5d5 --- /dev/null +++ b/examples/runner-config/local.py @@ -0,0 +1,9 @@ +# mypy: disable-error-code="name-defined" + +c.Bake.bakery_class = "pangeo_forge_runner.bakery.local.LocalDirectBakery" + +c.TargetStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem" +c.TargetStorage.root_path = "./target" + +c.InputCacheStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem" +c.InputCacheStorage.root_path = "./cache" diff --git a/pyproject.toml b/pyproject.toml index 5840db0e..5e91a5ee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,30 +25,27 @@ classifiers = [ license = { text = "Apache-2.0" } keywords = ["pangeo", "data"] dependencies = [ - "setuptools", - "dask >= 2021.11.2", - "distributed >= 2021.11.2", + "apache-beam", "cftime", + "dask >= 2021.11.2", + "fsspec[http] >= 2021.6.0", + "h5netcdf", "h5py >= 3.3.0", - "intake >= 0.6.4", - "intake-xarray >= 0.4.1", - "xarray >=0.18.0", + "kerchunk >= 0.0.7", "netcdf4", - "h5netcdf", - "zarr >= 2.12.0", "numcodecs >= 0.9.0", - "fsspec[http] >= 2021.6.0", - "kerchunk >= 0.0.7", - "mypy_extensions >= 0.4.2" + "xarray >= 0.18.0", + "zarr >= 2.12.0", ] [project.optional-dependencies] dev = [ "click", "pytest", - "pytest-sugar", "pytest-cov", "pytest-lazy-fixture", + "pytest-sugar", + "pytest-timeout", "scipy" ] @@ -62,12 +59,15 @@ Documentation = "https://pangeo-forge.readthedocs.io/en/latest/" write_to = "pangeo_forge_recipes/_version.py" write_to_template = "__version__ = '{version}'" +[tool.setuptools.packages.find] +exclude = ["docs_src"] + [tool.black] line-length = 100 [tool.isort] known_first_party = "pangeo_forge_recipes" -known_third_party = ["aiohttp", "apache_beam", "cftime", "click", "dask", "fsspec", "kerchunk", "numpy", "pandas", "pytest", "pytest_lazyfixture", "s3fs", "setuptools", "ujson", "xarray", "zarr"] +known_third_party = ["aiohttp", "apache_beam", "cftime", "click", "dask", "fsspec", "kerchunk", "numpy", "pandas", "pytest", "pytest_lazyfixture", "s3fs", "xarray", "zarr"] multi_line_output = 3 include_trailing_comma = true force_grid_wrap = 0 @@ -78,8 +78,3 @@ line_length = 100 log_cli = false timeout = 30 timeout_method = "signal" -addopts = "--ignore-glob=tests/recipe_tests/*" -markers = [ - "no_executor: Tests that do not use an executor fixture.", - "executor_function: Tests that use the Python function executor.", -] diff --git a/setup.cfg b/setup.cfg index eb8fdb5c..1cb951fc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,20 +2,5 @@ [flake8] max-line-length = 100 - -# remove this once rechunker executors are factored into a standalone package -# that exports type hints (https://mypy.readthedocs.io/en/latest/installed_packages.html#installed-packages) - -[mypy] -show_error_codes = True - -[mypy-dask.*] -ignore_missing_imports = True -[mypy-rechunker.*] -ignore_missing_imports = True -[mypy-fsspec.*] -ignore_missing_imports = True -[mypy-zarr.*] -ignore_missing_imports = True -[mypy-yaml.*] -ignore_missing_imports = True +per-file-ignores = + examples/runner-config/local.py:F821 diff --git a/setup.py b/setup.py deleted file mode 100644 index a56b0abd..00000000 --- a/setup.py +++ /dev/null @@ -1,4 +0,0 @@ -# shim for editable installs -from setuptools import setup - -setup() diff --git a/tests-integration/test_hrrr_kerchunk_concat_step.py b/tests-integration/test_hrrr_kerchunk_concat_step.py deleted file mode 100644 index 2de647d4..00000000 --- a/tests-integration/test_hrrr_kerchunk_concat_step.py +++ /dev/null @@ -1,95 +0,0 @@ -"""Integration test for Pangeo Forge pipeline which creates a combined Kerchunk dataset from -HRRR data. Based on prior discussion and examples provided in: - - https://github.com/pangeo-forge/pangeo-forge-recipes/issues/387#issuecomment-1193514343 - - https://gist.github.com/darothen/5380e223ae5bc894006a5b6ed5a27cbb - -This module can be run with pytest: - ``` - pytest tests-integration/test_hrrr_kerchunk_concat_step.py - ``` -The use of pytest-specific fixturing here is deliberately minimal, so that this module can be -easily adapted for other real-world HRRR use cases. -""" - -import json -import os -from tempfile import TemporaryDirectory - -import apache_beam as beam -import fsspec -import xarray as xr -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.testing.test_pipeline import TestPipeline -from fsspec.implementations.reference import ReferenceFileSystem - -from pangeo_forge_recipes.patterns import ConcatDim, FilePattern -from pangeo_forge_recipes.transforms import ( - CombineReferences, - OpenWithKerchunk, - WriteCombinedReference, -) - -remote_protocol = "s3" -storage_options = {"anon": True} - - -def format_function(step: int) -> str: - url_template = "s3://noaa-hrrr-bdp-pds/hrrr.20220722/conus/hrrr.t22z.wrfsfcf{step:02d}.grib2" - return url_template.format(step=step) - - -pattern = FilePattern(format_function, ConcatDim("step", [0, 1, 2, 3]), file_type="grib") - -identical_dims = ["time", "surface", "latitude", "longitude", "y", "x"] -grib_filters = {"typeOfLevel": "surface", "shortName": "t"} - -td = TemporaryDirectory() -store_name = "grib-test-store" -options = PipelineOptions(runtime_type_check=False) -with TestPipeline(options=options) as p: - ( - p - | beam.Create(pattern.items()) - | OpenWithKerchunk( - file_type=pattern.file_type, - remote_protocol=remote_protocol, - storage_options=storage_options, - kerchunk_open_kwargs={"filter": grib_filters}, - ) - | CombineReferences( - concat_dims=pattern.concat_dims, - identical_dims=identical_dims, - precombine_inputs=True, - ) - | WriteCombinedReference( - target_root=td.name, - store_name=store_name, - ) - ) - -full_path = os.path.join(td.name, store_name, "reference.json") - -with open(full_path) as f: - fs: ReferenceFileSystem = fsspec.filesystem( - "reference", - fo=json.load(f), - remote_protocol=remote_protocol, - remote_options=storage_options, - ) - -ds = xr.open_dataset( - fs.get_mapper(""), - engine="zarr", - backend_kwargs=dict(consolidated=False), - chunks={"step": 1}, -) -ds = ds.set_coords(("latitude", "longitude")) - - -def test_ds(): - assert ds.attrs["centre"] == "kwbc" - assert len(ds["step"]) == 4 - assert len(ds["time"]) == 1 - assert "t" in ds.data_vars - for coord in ["time", "surface", "latitude", "longitude"]: - assert coord in ds.coords diff --git a/tests-integration/test_hrrr_kerchunk_concat_valid_time.py b/tests-integration/test_hrrr_kerchunk_concat_valid_time.py deleted file mode 100644 index e9a85929..00000000 --- a/tests-integration/test_hrrr_kerchunk_concat_valid_time.py +++ /dev/null @@ -1,203 +0,0 @@ -"""Integration test that replicates the results of the following tutorial: - -https://projectpythia.org/kerchunk-cookbook/notebooks/case_studies/HRRR.html - -To verify that dataset built by the Pangeo Forge pipeline is correct, this test replicates -the dataset produced by the above-linked tutorial using the same methods demonstrated therein, -and then compares the output of the Pangeo Forge pipeline to this expected output. -""" - -import glob -import os -from typing import Any - -import apache_beam as beam -import fsspec -import pytest -import s3fs -import ujson # type: ignore -import xarray as xr -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.testing.test_pipeline import TestPipeline -from fsspec.implementations.reference import ReferenceFileSystem -from kerchunk.combine import MultiZarrToZarr -from kerchunk.grib2 import scan_grib - -from pangeo_forge_recipes.patterns import FilePattern, pattern_from_file_sequence -from pangeo_forge_recipes.transforms import ( - CombineReferences, - OpenWithKerchunk, - WriteCombinedReference, -) - - -def open_reference_ds(multi_kerchunk: dict, remote_protocol: str, remote_options: dict): - # open dataset as zarr object using fsspec reference file system and Xarray - fs: ReferenceFileSystem = fsspec.filesystem( - "reference", - fo=multi_kerchunk, - remote_protocol=remote_protocol, - remote_options=remote_options, - ) - m = fs.get_mapper("") - ds = xr.open_dataset( - m, engine="zarr", backend_kwargs=dict(consolidated=False), chunks={"valid_time": 1} - ) - return ds - - -@pytest.fixture() -def tmp_target_url(tmpdir_factory): - return str(tmpdir_factory.mktemp("target")) - - -@pytest.fixture -def remote_protocol() -> str: - return "s3" - - -@pytest.fixture -def storage_options(): - return {"anon": True} - - -@pytest.fixture -def grib_filters() -> dict: - return {"typeOfLevel": "heightAboveGround", "level": [2, 10]} - - -@pytest.fixture -def concat_dims() -> list[str]: - return ["valid_time"] - - -@pytest.fixture -def identical_dims() -> list[str]: - return ["latitude", "longitude", "heightAboveGround", "step"] - - -@pytest.fixture -def inline_threshold() -> int: - return 100 - - -def drop_unknown(refs: dict[str, Any]): - for k in list(refs): - if k.startswith("unknown"): - refs.pop(k) - return refs - - -@pytest.fixture -def src_files(remote_protocol, storage_options) -> list[str]: - fs_read: s3fs.S3FileSystem = fsspec.filesystem( - remote_protocol, - skip_instance_cache=True, - **storage_options, - ) - days_available = fs_read.glob("s3://noaa-hrrr-bdp-pds/hrrr.*") - files = fs_read.glob(f"s3://{days_available[-1]}/conus/*wrfsfcf01.grib2") - files = sorted(["s3://" + f for f in files]) - return files[0:2] - - -@pytest.fixture -def vanilla_kerchunk_ds( - tmpdir_factory, - src_files: list[str], - inline_threshold: int, - remote_protocol: str, - storage_options: dict, - grib_filters: dict, - concat_dims: list[str], - identical_dims: list[str], -): - """Based on https://projectpythia.org/kerchunk-cookbook/notebooks/case_studies/HRRR.html""" - - temp_dir = str(tmpdir_factory.mktemp("target")) - for url in src_files: - out = scan_grib( - url, - storage_options=storage_options, - inline_threshold=inline_threshold, - filter=grib_filters, - ) - for msg_number, msg in enumerate(out): - date = url.split("/")[3].split(".")[1] - name = url.split("/")[5].split(".")[1:3] - out_file_name = f"{temp_dir}/{date}_{name[0]}_{name[1]}_message{msg_number}.json" - with open(out_file_name, "w") as f: - f.write(ujson.dumps(msg)) - - output_files = glob.glob(f"{temp_dir}/*.json") - - # Combine individual references into single consolidated reference - mzz = MultiZarrToZarr( - output_files, - concat_dims=concat_dims, - identical_dims=identical_dims, - preprocess=drop_unknown, - ) - multi_kerchunk = mzz.translate() - ds = open_reference_ds(multi_kerchunk, remote_protocol, storage_options) - return ds - - -@pytest.fixture -def pipeline(): - options = PipelineOptions(runtime_type_check=False) - with TestPipeline(options=options) as p: - yield p - - -@pytest.fixture -def pangeo_forge_ds( - src_files, - pipeline, - concat_dims: list[str], - identical_dims: list[str], - tmp_target_url: str, - inline_threshold: int, - remote_protocol: str, - storage_options: dict, - grib_filters: dict, -): - """Aims to create the same dataset as `vanilla_kerchunk_ds` fixture, but with Pangeo Forge.""" - - pattern: FilePattern = pattern_from_file_sequence( - src_files, - concat_dim=concat_dims[0], - file_type="grib", - ) - store_name = "grib-test-store" - with pipeline as p: - ( - p - | beam.Create(pattern.items()) - | OpenWithKerchunk( - file_type=pattern.file_type, - inline_threshold=inline_threshold, - remote_protocol=remote_protocol, - storage_options=storage_options, - kerchunk_open_kwargs={"filter": grib_filters}, - ) - | CombineReferences( - concat_dims=concat_dims, - identical_dims=identical_dims, - mzz_kwargs=dict(preprocess=drop_unknown), - precombine_inputs=True, - ) - | WriteCombinedReference( - target_root=tmp_target_url, - store_name=store_name, - ) - ) - full_path = os.path.join(tmp_target_url, store_name, "reference.json") - with open(full_path) as f: - multi_kerchunk = ujson.load(f) - ds = open_reference_ds(multi_kerchunk, remote_protocol, storage_options) - return ds - - -def test_multi_message_grib(vanilla_kerchunk_ds, pangeo_forge_ds): - xr.testing.assert_equal(vanilla_kerchunk_ds, pangeo_forge_ds) diff --git a/tests/conftest.py b/tests/conftest.py index 975dc8dd..bf0238df 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -40,19 +40,21 @@ from .data_generation import make_ds -# Helper functions -------------------------------------------------------------------------------- - -# to use this feature, e.g. -# $ pytest --redirect-dask-worker-logs-to-stdout=DEBUG +# To use this feature, e.g. `$ pytest --run-integration` +# https://jwodder.github.io/kbits/posts/pytest-mark-off/ def pytest_addoption(parser): parser.addoption( - "--redirect-dask-worker-logs-to-stdout", - action="store", - default="NOTSET", + "--run-integration", + action="store_true", + default=False, + help="Run integration tests. Skipped by default because they are slow-running.", ) +# Helper functions -------------------------------------------------------------------------------- + + def split_up_files_by_day(ds, day_param): gb = ds.resample(time=day_param) _, datasets = zip(*gb) diff --git a/tests/test_end_to_end.py b/tests/test_end_to_end.py index 2260d80b..d6f78222 100644 --- a/tests/test_end_to_end.py +++ b/tests/test_end_to_end.py @@ -1,3 +1,4 @@ +import importlib.util import os from pathlib import Path @@ -121,6 +122,15 @@ def test_reference_netcdf( xr.testing.assert_equal(ds.load(), daily_xarray_dataset) +@pytest.mark.xfail( + importlib.util.find_spec("cfgrib") is None, + reason=( + "Requires cfgrib, which should be installed via conda. " + "FIXME: Setup separate testing environment for `requires-conda` tests. " + "NOTE: The HRRR integration tests would also fall into this category." + ), + raises=ImportError, +) def test_reference_grib( pipeline, tmp_target_url, diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 00000000..70723834 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,81 @@ +import json +import os +import subprocess +import time +from pathlib import Path + +import pytest + +# Run only when the `--run-integration` option is passed. +# See also `pytest_addoption` in conftest. Reference: +# https://jwodder.github.io/kbits/posts/pytest-mark-off/ +pytestmark = pytest.mark.skipif( + "not config.getoption('--run-integration')", + reason="Only run when --run-integration is given", +) + +EXAMPLES = Path(__file__).parent.parent / "examples" + + +def test_python_json_configs_identical(): + """We provide examples of both Python and JSON config. By ensuring they are + identical, we can confidently use just one of them for the integration tests. + """ + from pangeo_forge_runner.commands.base import BaseCommand # type: ignore + + python_, json_ = BaseCommand(), BaseCommand() + python_.load_config_file((EXAMPLES / "runner-config" / "local.py").absolute().as_posix()) + json_.load_config_file((EXAMPLES / "runner-config" / "local.json").absolute().as_posix()) + + assert python_.config and json_.config # make sure we actually loaded something + assert python_.config == json_.config + + +@pytest.fixture +def confpath(tmp_path_factory: pytest.TempPathFactory): + """The JSON config is easier to modify with tempdirs, so we use that here for + convenience. But we know it's the same as the Python config, because we test that. + """ + tmp = tmp_path_factory.mktemp("tmp") + fname = "local.json" + dstpath = tmp / fname + with open(EXAMPLES / "runner-config" / fname) as src: + with dstpath.open(mode="w") as dst: + c = json.load(src) + c["TargetStorage"]["root_path"] = (tmp / "target").absolute().as_posix() + c["InputCacheStorage"]["root_path"] = (tmp / "cache").absolute().as_posix() + json.dump(c, dst) + + return dstpath.absolute().as_posix() + + +@pytest.mark.parametrize( + "recipe_id", + [ + p.stem.replace("_", "-") + for p in (EXAMPLES / "feedstock").iterdir() + if p.suffix == ".py" and not p.stem.startswith("_") + ], +) +def test_integration(recipe_id: str, confpath: str): + """Run the example recipes in the ``examples/feedstock`` directory.""" + + xfails = { + "hrrr-kerchunk-concat-step": "WriteCombineReference doesn't return zarr.storage.FSStore", + "hrrr-kerchunk-concat-valid-time": "Can't serialize drop_unknown callback function.", + "narr-opendap": "Hangs for unkown reason. Requires further debugging.", + "terraclimate": "Hangs for unkown reason. Requires further debugging.", + } + if recipe_id in xfails: + pytest.xfail(xfails[recipe_id]) + + bake_script = (EXAMPLES / "runner-commands" / "bake.sh").absolute().as_posix() + cmd = ["sh", bake_script] + env = os.environ.copy() | { + "REPO": EXAMPLES.absolute().as_posix(), + "CONFIG_FILE": confpath, + "RECIPE_ID": recipe_id, + "JOB_NAME": f"{recipe_id}-{str(int(time.time()))}", + } + proc = subprocess.run(cmd, capture_output=True, env=env) + assert proc.returncode == 0