Skip to content

Commit

Permalink
Merge pull request #20 from danielfromearth/feature/issue-6
Browse files Browse the repository at this point in the history
Feature/issue 6
  • Loading branch information
danielfromearth authored Oct 10, 2023
2 parents 091aaa9 + 62cdf72 commit 407c978
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 115 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- [issue/13](https://github.com/danielfromearth/batchee/issues/13): Add simple command line interface for testing
- [issue/16](https://github.com/danielfromearth/batchee/issues/16): Add a logo
- [issue/6](https://github.com/danielfromearth/batchee/issues/6): Create Adapter code that processes a Harmony Message and STAC Catalog
### Changed
- [issue/11](https://github.com/danielfromearth/batchee/issues/11): Rename from concat_batcher to batchee
- [issue/21](https://github.com/danielfromearth/batchee/issues/21): Improve CICD workflows
Expand Down
Empty file added batcher/harmony/__init__.py
Empty file.
2 changes: 1 addition & 1 deletion batcher/harmony_cli.py → batcher/harmony/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import harmony

from .harmony_adapter import ConcatBatching as HarmonyAdapter
from batcher.harmony.service_adapter import ConcatBatching as HarmonyAdapter


def main(argv, **kwargs):
Expand Down
108 changes: 108 additions & 0 deletions batcher/harmony/service_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from uuid import uuid4

import pystac
from harmony.adapter import BaseHarmonyAdapter
from harmony.util import bbox_to_geometry
from pystac import Item
from pystac.item import Asset

from batcher.harmony.util import (
_get_netcdf_urls,
_get_output_bounding_box,
_get_output_date_range,
)
from batcher.tempo_filename_parser import get_batch_indices


class ConcatBatching(BaseHarmonyAdapter):
"""
A harmony-service-lib wrapper around the concatenate-batcher module.
This wrapper does not support Harmony calls that do not have STAC catalogs
as support for this behavior is being depreciated in harmony-service-lib
"""

def __init__(self, message, catalog=None, config=None):
"""
Constructs the adapter
Parameters
----------
message : harmony.Message
The Harmony input which needs acting upon
catalog : pystac.Catalog
A STAC catalog containing the files on which to act
config : harmony.util.Config
The configuration values for this runtime environment.
"""
super().__init__(message, catalog=catalog, config=config)

def invoke(self):
"""
Primary entrypoint into the service wrapper. Overrides BaseHarmonyAdapter.invoke
"""
if not self.catalog:
# Message-only support is being depreciated in Harmony, so we should expect to
# only see requests with catalogs when invoked with a newer Harmony instance
# https://github.com/nasa/harmony-service-lib-py/blob/21bcfbda17caf626fb14d2ac4f8673be9726b549/harmony/adapter.py#L71
raise RuntimeError("Invoking Batchee without a STAC catalog is not supported")

return self.message, self.process_catalog(self.catalog)

def process_catalog(self, catalog: pystac.Catalog):
"""Converts a list of STAC catalogs into a list of lists of STAC catalogs."""
try:
result = catalog.clone()
result.id = str(uuid4())
result.clear_children()

# Get all the items from the catalog, including from child or linked catalogs
items = list(self.get_all_catalog_items(catalog))

# Quick return if catalog contains no items
if len(items) == 0:
return result

# # --- Get granule filepaths (urls) ---
netcdf_urls: list[str] = _get_netcdf_urls(items)

# --- Map each granule to an index representing the batch to which it belongs ---
batch_indices: list[int] = get_batch_indices(netcdf_urls)
sorted(set(batch_indices), key=batch_indices.index)

# --- Construct a dictionary with a separate key for each batch ---
grouped: dict[int, list[Item]] = {}
for k, v in zip(batch_indices, items):
grouped.setdefault(k, []).append(v)

# --- Construct a STAC Catalog that holds multiple Items (which represent each TEMPO scan),
# and each Item holds multiple Assets (which represent each granule).
result.clear_items()

for batch_id, batch_items in grouped.items():
batch_urls: list[str] = _get_netcdf_urls(batch_items)
bounding_box = _get_output_bounding_box(batch_items)
properties = _get_output_date_range(batch_items)

# Construct a new pystac.Item with every granule in the batch as a pystac.Asset
output_item = Item(
str(uuid4()), bbox_to_geometry(bounding_box), bounding_box, None, properties
)

for idx, item in enumerate(batch_items):
output_item.add_asset(
"data",
Asset(
batch_urls[idx],
title=batch_urls[idx],
media_type="application/x-netcdf4",
roles=["data"],
),
)

result.add_item(output_item)

return result

except Exception as service_exception:
self.logger.error(service_exception, exc_info=1)
raise service_exception
98 changes: 98 additions & 0 deletions batcher/harmony/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Misc utility functions"""
from datetime import datetime

from pystac import Asset, Item

VALID_EXTENSIONS = (".nc4", ".nc")
VALID_MEDIA_TYPES = ["application/x-netcdf", "application/x-netcdf4"]


def _is_netcdf_asset(asset: Asset) -> bool:
"""Check that a `pystac.Asset` is a valid NetCDF-4 granule. This can be
ascertained via either the media type or by checking the extension of
granule itself if that media type is absent.
"""
return asset.media_type in VALID_MEDIA_TYPES or (
asset.media_type is None and asset.href.lower().endswith(VALID_EXTENSIONS)
)


def _get_item_url(item: Item) -> str | None:
"""Check the `pystac.Item` for the first asset with the `data` role and a
valid input format. If there are no matching assets, return None
"""
return next(
(
asset.href
for asset in item.assets.values()
if "data" in (asset.roles or []) and _is_netcdf_asset(asset)
),
None,
)


def _get_netcdf_urls(items: list[Item]) -> list[str]:
"""Iterate through a list of `pystac.Item` instances, from the input
`pystac.Catalog`. Extract the `pystac.Asset.href` for the first asset
of each item that has a role of "data". If there are any items that do
not have a data asset, then raise an exception.
"""
catalog_urls = [_get_item_url(item) for item in items]

if None in catalog_urls:
raise RuntimeError("Some input granules do not have NetCDF-4 assets.")

return catalog_urls # type: ignore[return-value]


def _get_output_bounding_box(input_items: list[Item]) -> list[float]:
"""Create a bounding box that is the maximum combined extent of all input
`pystac.Item` bounding box extents.
"""
bounding_box = input_items[0].bbox

for item in input_items:
bounding_box[0] = min(bounding_box[0], item.bbox[0])
bounding_box[1] = min(bounding_box[1], item.bbox[1])
bounding_box[2] = max(bounding_box[2], item.bbox[2])
bounding_box[3] = max(bounding_box[3], item.bbox[3])

return bounding_box


def _get_output_date_range(input_items: list[Item]) -> dict[str, str]:
"""Create a dictionary of start and end datetime, which encompasses the
full temporal range of all input `pystac.Item` instances. This output
dictionary will be used for the `properties` of the output Zarr store
`pystac.Item`.
"""
start_datetime, end_datetime = _get_item_date_range(input_items[0])

for item in input_items:
new_start_datetime, new_end_datetime = _get_item_date_range(item)
start_datetime = min(start_datetime, new_start_datetime)
end_datetime = max(end_datetime, new_end_datetime)

return {"start_datetime": start_datetime.isoformat(), "end_datetime": end_datetime.isoformat()}


def _get_item_date_range(item: Item) -> tuple[datetime, datetime]:
"""A helper function to retrieve the temporal range from a `pystac.Item`
instance. If the `pystac.Item.datetime` property exists, there is a
single datetime associated with the granule, otherwise there will be a
start and end time contained within the `pystac.Item` metadata.
"""
if item.datetime is None:
start_datetime = item.common_metadata.start_datetime
end_datetime = item.common_metadata.end_datetime
else:
start_datetime = item.datetime
end_datetime = item.datetime

return start_datetime, end_datetime
104 changes: 0 additions & 104 deletions batcher/harmony_adapter.py

This file was deleted.

14 changes: 7 additions & 7 deletions batcher/tempo_filename_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
)


def get_unique_day_scan_categories(filenames: list) -> list[int]:
def get_batch_indices(filenames: list) -> list[int]:
"""
Returns
-------
list[int]
category integer for each filename in the original list, e.g. [0, 0, 0, 1, 1, 1, ...]
batch index for each filename in the original list, e.g. [0, 0, 0, 1, 1, 1, ...]
"""
# Make a new list with days and scans, e.g. [('20130701', 'S009'), ('20130701', 'S009'), ...]
day_and_scans: list[tuple[str, str]] = []
Expand All @@ -32,19 +32,19 @@ def get_unique_day_scan_categories(filenames: list) -> list[int]:
match_dict = matches.groupdict()
day_and_scans.append((match_dict["day_in_granule"], match_dict["daily_scan_id"]))

# Unique categories are determined, while keeping the same order
# Unique day-scans are determined (while keeping the same order). Each will be its own batch.
unique_day_scans: list[tuple[str, str]] = sorted(set(day_and_scans), key=day_and_scans.index)

# Map each day/scan to an integer
category_mapper: dict[tuple[str, str], int] = {
batch_mapper: dict[tuple[str, str], int] = {
day_scan: idx for idx, day_scan in enumerate(unique_day_scans)
}

# Generate a new list with the integer representation for each entry in the original list
return [category_mapper[day_scan] for day_scan in day_and_scans]
return [batch_mapper[day_scan] for day_scan in day_and_scans]


def main():
def main() -> list[list[str]]:
"""Main CLI entrypoint"""

parser = ArgumentParser(
Expand All @@ -69,7 +69,7 @@ def main():

input_filenames = [str(Path(f).resolve()) for f in args.file_names]

batch_indices = get_unique_day_scan_categories(input_filenames)
batch_indices = get_batch_indices(input_filenames)
unique_category_indices: list[int] = sorted(set(batch_indices), key=batch_indices.index)
logging.info(f"batch_indices = {batch_indices}")

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ruff = "^0.0.292"
coverage = "^7.3.2"

[tool.poetry.scripts]
batchee_harmony = 'batcher.harmony_cli:main'
batchee_harmony = 'batcher.harmony.cli:main'
batchee = 'batcher.tempo_filename_parser:main'

[build-system]
Expand Down
Loading

0 comments on commit 407c978

Please sign in to comment.