From 386f99d858b11401c4ced73527e00bdd20f3975c Mon Sep 17 00:00:00 2001 From: danielfromearth Date: Tue, 3 Oct 2023 14:01:28 -0400 Subject: [PATCH 1/3] rename variables and functions for readability --- batcher/harmony_adapter.py | 11 +++++++---- batcher/tempo_filename_parser.py | 10 +++++----- tests/test_filename_grouping.py | 4 ++-- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/batcher/harmony_adapter.py b/batcher/harmony_adapter.py index 80c1976..8d3b1c4 100644 --- a/batcher/harmony_adapter.py +++ b/batcher/harmony_adapter.py @@ -2,7 +2,7 @@ from pystac import Catalog, Item from pystac.item import Asset -from batcher.tempo_filename_parser import get_unique_day_scan_categories +from batcher.tempo_filename_parser import get_batch_indices VALID_EXTENSIONS = (".nc4", ".nc") VALID_MEDIA_TYPES = ["application/x-netcdf", "application/x-netcdf4"] @@ -86,16 +86,19 @@ def invoke(self): def process_items_many_to_one(self): """Converts a list of STAC catalogs into a list of lists of STAC catalogs.""" try: + # --- Get granule filepaths (urls) --- items: list[Catalog] = list(self.get_all_catalog_items(self.catalog)) netcdf_urls: list[str] = _get_netcdf_urls(items) - batch_indices: list[int] = get_unique_day_scan_categories(netcdf_urls) - unique_category_indices: list[int] = sorted(set(batch_indices), key=batch_indices.index) + # --- Map each granule to an index representing the batch to which it belongs --- + batch_indices: list[int] = get_batch_indices(netcdf_urls) + unique_batch_indices: list[int] = sorted(set(batch_indices), key=batch_indices.index) + # --- Construct a STAC object based on the batch indices --- grouped: dict[int, list[Catalog]] = {} for k, v in zip(batch_indices, items): grouped.setdefault(k, []).append(v) - catalogs: list[list[Catalog]] = [grouped[k] for k in unique_category_indices] + catalogs: list[list[Catalog]] = [grouped[k] for k in unique_batch_indices] return catalogs diff --git a/batcher/tempo_filename_parser.py b/batcher/tempo_filename_parser.py index 89e8ed6..76e71fd 100644 --- a/batcher/tempo_filename_parser.py +++ b/batcher/tempo_filename_parser.py @@ -14,12 +14,12 @@ ) -def get_unique_day_scan_categories(filenames: list) -> list[int]: +def get_batch_indices(filenames: list) -> list[int]: """ Returns ------- list[int] - category integer for each filename in the original list, e.g. [0, 0, 0, 1, 1, 1, ...] + batch index for each filename in the original list, e.g. [0, 0, 0, 1, 1, 1, ...] """ # Make a new list with days and scans, e.g. [('20130701', 'S009'), ('20130701', 'S009'), ...] day_and_scans: list[tuple[str, str]] = [] @@ -29,13 +29,13 @@ def get_unique_day_scan_categories(filenames: list) -> list[int]: match_dict = matches.groupdict() day_and_scans.append((match_dict["day_in_granule"], match_dict["daily_scan_id"])) - # Unique categories are determined, while keeping the same order + # Unique day-scans are determined (while keeping the same order). Each will be its own batch. unique_day_scans: list[tuple[str, str]] = sorted(set(day_and_scans), key=day_and_scans.index) # Map each day/scan to an integer - category_mapper: dict[tuple[str, str], int] = { + batch_mapper: dict[tuple[str, str], int] = { day_scan: idx for idx, day_scan in enumerate(unique_day_scans) } # Generate a new list with the integer representation for each entry in the original list - return [category_mapper[day_scan] for day_scan in day_and_scans] + return [batch_mapper[day_scan] for day_scan in day_and_scans] diff --git a/tests/test_filename_grouping.py b/tests/test_filename_grouping.py index 945a79a..83ae365 100644 --- a/tests/test_filename_grouping.py +++ b/tests/test_filename_grouping.py @@ -1,4 +1,4 @@ -from batcher.tempo_filename_parser import get_unique_day_scan_categories +from batcher.tempo_filename_parser import get_batch_indices example_filenames = [ "TEMPO_HCHO_L2_V01_20130701T212354Z_S009G05.nc", @@ -11,6 +11,6 @@ def test_grouping(): - results = get_unique_day_scan_categories(example_filenames) + results = get_batch_indices(example_filenames) assert results == [0, 0, 0, 1, 1, 1] From dc49a856caf67f4391c0d770b58b6daa57602b27 Mon Sep 17 00:00:00 2001 From: danielfromearth Date: Thu, 5 Oct 2023 13:00:06 -0400 Subject: [PATCH 2/3] draft code for adapter --- batcher/harmony/__init__.py | 0 batcher/{harmony_cli.py => harmony/cli.py} | 2 +- batcher/harmony/service_adapter.py | 108 +++++++++++++++++++++ batcher/harmony/util.py | 98 +++++++++++++++++++ batcher/harmony_adapter.py | 107 -------------------- batcher/tempo_filename_parser.py | 2 +- pyproject.toml | 2 +- 7 files changed, 209 insertions(+), 110 deletions(-) create mode 100644 batcher/harmony/__init__.py rename batcher/{harmony_cli.py => harmony/cli.py} (91%) create mode 100644 batcher/harmony/service_adapter.py create mode 100644 batcher/harmony/util.py delete mode 100644 batcher/harmony_adapter.py diff --git a/batcher/harmony/__init__.py b/batcher/harmony/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/batcher/harmony_cli.py b/batcher/harmony/cli.py similarity index 91% rename from batcher/harmony_cli.py rename to batcher/harmony/cli.py index cec149e..61756ff 100644 --- a/batcher/harmony_cli.py +++ b/batcher/harmony/cli.py @@ -4,7 +4,7 @@ import harmony -from .harmony_adapter import ConcatBatching as HarmonyAdapter +from batcher.harmony.service_adapter import ConcatBatching as HarmonyAdapter def main(argv, **kwargs): diff --git a/batcher/harmony/service_adapter.py b/batcher/harmony/service_adapter.py new file mode 100644 index 0000000..f6078fa --- /dev/null +++ b/batcher/harmony/service_adapter.py @@ -0,0 +1,108 @@ +from uuid import uuid4 + +import pystac +from harmony.adapter import BaseHarmonyAdapter +from harmony.util import bbox_to_geometry +from pystac import Item +from pystac.item import Asset + +from batcher.harmony.util import ( + _get_netcdf_urls, + _get_output_bounding_box, + _get_output_date_range, +) +from batcher.tempo_filename_parser import get_batch_indices + + +class ConcatBatching(BaseHarmonyAdapter): + """ + A harmony-service-lib wrapper around the concatenate-batcher module. + This wrapper does not support Harmony calls that do not have STAC catalogs + as support for this behavior is being depreciated in harmony-service-lib + """ + + def __init__(self, message, catalog=None, config=None): + """ + Constructs the adapter + + Parameters + ---------- + message : harmony.Message + The Harmony input which needs acting upon + catalog : pystac.Catalog + A STAC catalog containing the files on which to act + config : harmony.util.Config + The configuration values for this runtime environment. + """ + super().__init__(message, catalog=catalog, config=config) + + def invoke(self): + """ + Primary entrypoint into the service wrapper. Overrides BaseHarmonyAdapter.invoke + """ + if not self.catalog: + # Message-only support is being depreciated in Harmony, so we should expect to + # only see requests with catalogs when invoked with a newer Harmony instance + # https://github.com/nasa/harmony-service-lib-py/blob/21bcfbda17caf626fb14d2ac4f8673be9726b549/harmony/adapter.py#L71 + raise RuntimeError("Invoking Batchee without a STAC catalog is not supported") + + return self.message, self.process_catalog(self.catalog) + + def process_catalog(self, catalog: pystac.Catalog): + """Converts a list of STAC catalogs into a list of lists of STAC catalogs.""" + try: + result = catalog.clone() + result.id = str(uuid4()) + result.clear_children() + + # Get all the items from the catalog, including from child or linked catalogs + items = list(self.get_all_catalog_items(catalog)) + + # Quick return if catalog contains no items + if len(items) == 0: + return result + + # # --- Get granule filepaths (urls) --- + netcdf_urls: list[str] = _get_netcdf_urls(items) + + # --- Map each granule to an index representing the batch to which it belongs --- + batch_indices: list[int] = get_batch_indices(netcdf_urls) + sorted(set(batch_indices), key=batch_indices.index) + + # --- Construct a dictionary with a separate key for each batch --- + grouped: dict[int, list[Item]] = {} + for k, v in zip(batch_indices, items): + grouped.setdefault(k, []).append(v) + + # --- Construct a STAC Catalog that holds multiple Items (which represent each TEMPO scan), + # and each Item holds multiple Assets (which represent each granule). + result.clear_items() + + for batch_id, batch_items in grouped.items(): + batch_urls: list[str] = _get_netcdf_urls(batch_items) + bounding_box = _get_output_bounding_box(batch_items) + properties = _get_output_date_range(batch_items) + + # Construct a new pystac.Item with every granule in the batch as a pystac.Asset + output_item = Item( + str(uuid4()), bbox_to_geometry(bounding_box), bounding_box, None, properties + ) + + for idx, item in enumerate(batch_items): + output_item.add_asset( + "data", + Asset( + batch_urls[idx], + title=batch_urls[idx], + media_type="application/x-netcdf4", + roles=["data"], + ), + ) + + result.add_item(output_item) + + return result + + except Exception as service_exception: + self.logger.error(service_exception, exc_info=1) + raise service_exception diff --git a/batcher/harmony/util.py b/batcher/harmony/util.py new file mode 100644 index 0000000..f77aeb9 --- /dev/null +++ b/batcher/harmony/util.py @@ -0,0 +1,98 @@ +"""Misc utility functions""" +from datetime import datetime + +from pystac import Asset, Item + +VALID_EXTENSIONS = (".nc4", ".nc") +VALID_MEDIA_TYPES = ["application/x-netcdf", "application/x-netcdf4"] + + +def _is_netcdf_asset(asset: Asset) -> bool: + """Check that a `pystac.Asset` is a valid NetCDF-4 granule. This can be + ascertained via either the media type or by checking the extension of + granule itself if that media type is absent. + + """ + return asset.media_type in VALID_MEDIA_TYPES or ( + asset.media_type is None and asset.href.lower().endswith(VALID_EXTENSIONS) + ) + + +def _get_item_url(item: Item) -> str | None: + """Check the `pystac.Item` for the first asset with the `data` role and a + valid input format. If there are no matching assets, return None + + """ + return next( + ( + asset.href + for asset in item.assets.values() + if "data" in (asset.roles or []) and _is_netcdf_asset(asset) + ), + None, + ) + + +def _get_netcdf_urls(items: list[Item]) -> list[str]: + """Iterate through a list of `pystac.Item` instances, from the input + `pystac.Catalog`. Extract the `pystac.Asset.href` for the first asset + of each item that has a role of "data". If there are any items that do + not have a data asset, then raise an exception. + + """ + catalog_urls = [_get_item_url(item) for item in items] + + if None in catalog_urls: + raise RuntimeError("Some input granules do not have NetCDF-4 assets.") + + return catalog_urls # type: ignore[return-value] + + +def _get_output_bounding_box(input_items: list[Item]) -> list[float]: + """Create a bounding box that is the maximum combined extent of all input + `pystac.Item` bounding box extents. + + """ + bounding_box = input_items[0].bbox + + for item in input_items: + bounding_box[0] = min(bounding_box[0], item.bbox[0]) + bounding_box[1] = min(bounding_box[1], item.bbox[1]) + bounding_box[2] = max(bounding_box[2], item.bbox[2]) + bounding_box[3] = max(bounding_box[3], item.bbox[3]) + + return bounding_box + + +def _get_output_date_range(input_items: list[Item]) -> dict[str, str]: + """Create a dictionary of start and end datetime, which encompasses the + full temporal range of all input `pystac.Item` instances. This output + dictionary will be used for the `properties` of the output Zarr store + `pystac.Item`. + + """ + start_datetime, end_datetime = _get_item_date_range(input_items[0]) + + for item in input_items: + new_start_datetime, new_end_datetime = _get_item_date_range(item) + start_datetime = min(start_datetime, new_start_datetime) + end_datetime = max(end_datetime, new_end_datetime) + + return {"start_datetime": start_datetime.isoformat(), "end_datetime": end_datetime.isoformat()} + + +def _get_item_date_range(item: Item) -> tuple[datetime, datetime]: + """A helper function to retrieve the temporal range from a `pystac.Item` + instance. If the `pystac.Item.datetime` property exists, there is a + single datetime associated with the granule, otherwise there will be a + start and end time contained within the `pystac.Item` metadata. + + """ + if item.datetime is None: + start_datetime = item.common_metadata.start_datetime + end_datetime = item.common_metadata.end_datetime + else: + start_datetime = item.datetime + end_datetime = item.datetime + + return start_datetime, end_datetime diff --git a/batcher/harmony_adapter.py b/batcher/harmony_adapter.py deleted file mode 100644 index 8d3b1c4..0000000 --- a/batcher/harmony_adapter.py +++ /dev/null @@ -1,107 +0,0 @@ -from harmony.adapter import BaseHarmonyAdapter -from pystac import Catalog, Item -from pystac.item import Asset - -from batcher.tempo_filename_parser import get_batch_indices - -VALID_EXTENSIONS = (".nc4", ".nc") -VALID_MEDIA_TYPES = ["application/x-netcdf", "application/x-netcdf4"] - - -def _is_netcdf_asset(asset: Asset) -> bool: - """Check that a `pystac.Asset` is a valid NetCDF-4 granule. This can be - ascertained via either the media type or by checking the extension of - granule itself if that media type is absent. - - """ - return asset.media_type in VALID_MEDIA_TYPES or ( - asset.media_type is None and asset.href.lower().endswith(VALID_EXTENSIONS) - ) - - -def _get_item_url(item: Item) -> str | None: - """Check the `pystac.Item` for the first asset with the `data` role and a - valid input format. If there are no matching assets, return None - - """ - return next( - ( - asset.href - for asset in item.assets.values() - if "data" in (asset.roles or []) and _is_netcdf_asset(asset) - ), - None, - ) - - -def _get_netcdf_urls(items: list[Item]) -> list[str]: - """Iterate through a list of `pystac.Item` instances, from the input - `pystac.Catalog`. Extract the `pystac.Asset.href` for the first asset - of each item that has a role of "data". If there are any items that do - not have a data asset, then raise an exception. - - """ - catalog_urls = [_get_item_url(item) for item in items] - - if None in catalog_urls: - raise RuntimeError("Some input granules do not have NetCDF-4 assets.") - - return catalog_urls # type: ignore[return-value] - - -class ConcatBatching(BaseHarmonyAdapter): - """ - A harmony-service-lib wrapper around the concatenate-batcher module. - This wrapper does not support Harmony calls that do not have STAC catalogs - as support for this behavior is being depreciated in harmony-service-lib - """ - - def __init__(self, message, catalog=None, config=None): - """ - Constructs the adapter - - Parameters - ---------- - message : harmony.Message - The Harmony input which needs acting upon - catalog : pystac.Catalog - A STAC catalog containing the files on which to act - config : harmony.util.Config - The configuration values for this runtime environment. - """ - super().__init__(message, catalog=catalog, config=config) - - def invoke(self): - """ - Primary entrypoint into the service wrapper. Overrides BaseHarmonyAdapter.invoke - """ - if not self.catalog: - # Message-only support is being depreciated in Harmony, so we should expect to - # only see requests with catalogs when invoked with a newer Harmony instance - # https://github.com/nasa/harmony-service-lib-py/blob/21bcfbda17caf626fb14d2ac4f8673be9726b549/harmony/adapter.py#L71 - raise RuntimeError("Invoking Batcher without a STAC catalog is not supported") - - return self.message, self.process_items_many_to_one() - - def process_items_many_to_one(self): - """Converts a list of STAC catalogs into a list of lists of STAC catalogs.""" - try: - # --- Get granule filepaths (urls) --- - items: list[Catalog] = list(self.get_all_catalog_items(self.catalog)) - netcdf_urls: list[str] = _get_netcdf_urls(items) - - # --- Map each granule to an index representing the batch to which it belongs --- - batch_indices: list[int] = get_batch_indices(netcdf_urls) - unique_batch_indices: list[int] = sorted(set(batch_indices), key=batch_indices.index) - - # --- Construct a STAC object based on the batch indices --- - grouped: dict[int, list[Catalog]] = {} - for k, v in zip(batch_indices, items): - grouped.setdefault(k, []).append(v) - catalogs: list[list[Catalog]] = [grouped[k] for k in unique_batch_indices] - - return catalogs - - except Exception as service_exception: - self.logger.error(service_exception, exc_info=1) - raise service_exception diff --git a/batcher/tempo_filename_parser.py b/batcher/tempo_filename_parser.py index 6aafca9..7d6dbe4 100644 --- a/batcher/tempo_filename_parser.py +++ b/batcher/tempo_filename_parser.py @@ -44,7 +44,7 @@ def get_batch_indices(filenames: list) -> list[int]: return [batch_mapper[day_scan] for day_scan in day_and_scans] -def main(): +def main() -> list[list[str]]: """Main CLI entrypoint""" parser = ArgumentParser( diff --git a/pyproject.toml b/pyproject.toml index 3399aed..15bf7d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,7 @@ ruff = "^0.0.292" coverage = "^7.3.2" [tool.poetry.scripts] -batchee_harmony = 'batcher.harmony_cli:main' +batchee_harmony = 'batcher.harmony.cli:main' batchee = 'batcher.tempo_filename_parser:main' [build-system] From 62cdf726c308ca5fe1189edc53bcacd73c7feec1 Mon Sep 17 00:00:00 2001 From: danielfromearth Date: Thu, 5 Oct 2023 13:29:38 -0400 Subject: [PATCH 3/3] update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8822f6..43c37c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - [issue/13](https://github.com/danielfromearth/batchee/issues/13): Add simple command line interface for testing - [issue/16](https://github.com/danielfromearth/batchee/issues/16): Add a logo +- [issue/6](https://github.com/danielfromearth/batchee/issues/6): Create Adapter code that processes a Harmony Message and STAC Catalog ### Changed - [issue/11](https://github.com/danielfromearth/batchee/issues/11): Rename from concat_batcher to batchee ### Deprecated