Skip to content

Commit

Permalink
draft code for adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
danielfromearth committed Oct 5, 2023
1 parent 8c7b4cc commit dc49a85
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 110 deletions.
Empty file added batcher/harmony/__init__.py
Empty file.
2 changes: 1 addition & 1 deletion batcher/harmony_cli.py → batcher/harmony/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import harmony

from .harmony_adapter import ConcatBatching as HarmonyAdapter
from batcher.harmony.service_adapter import ConcatBatching as HarmonyAdapter


def main(argv, **kwargs):
Expand Down
108 changes: 108 additions & 0 deletions batcher/harmony/service_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from uuid import uuid4

import pystac
from harmony.adapter import BaseHarmonyAdapter
from harmony.util import bbox_to_geometry
from pystac import Item
from pystac.item import Asset

from batcher.harmony.util import (
_get_netcdf_urls,
_get_output_bounding_box,
_get_output_date_range,
)
from batcher.tempo_filename_parser import get_batch_indices


class ConcatBatching(BaseHarmonyAdapter):
"""
A harmony-service-lib wrapper around the concatenate-batcher module.
This wrapper does not support Harmony calls that do not have STAC catalogs
as support for this behavior is being depreciated in harmony-service-lib
"""

def __init__(self, message, catalog=None, config=None):
"""
Constructs the adapter

Parameters
----------
message : harmony.Message
The Harmony input which needs acting upon
catalog : pystac.Catalog
A STAC catalog containing the files on which to act
config : harmony.util.Config
The configuration values for this runtime environment.
"""
super().__init__(message, catalog=catalog, config=config)

def invoke(self):
"""
Primary entrypoint into the service wrapper. Overrides BaseHarmonyAdapter.invoke
"""
if not self.catalog:
# Message-only support is being depreciated in Harmony, so we should expect to
# only see requests with catalogs when invoked with a newer Harmony instance
# https://github.com/nasa/harmony-service-lib-py/blob/21bcfbda17caf626fb14d2ac4f8673be9726b549/harmony/adapter.py#L71
raise RuntimeError("Invoking Batchee without a STAC catalog is not supported")

return self.message, self.process_catalog(self.catalog)

def process_catalog(self, catalog: pystac.Catalog):
"""Converts a list of STAC catalogs into a list of lists of STAC catalogs."""
try:
result = catalog.clone()
result.id = str(uuid4())
result.clear_children()

# Get all the items from the catalog, including from child or linked catalogs
items = list(self.get_all_catalog_items(catalog))

# Quick return if catalog contains no items
if len(items) == 0:
return result

# # --- Get granule filepaths (urls) ---
netcdf_urls: list[str] = _get_netcdf_urls(items)

# --- Map each granule to an index representing the batch to which it belongs ---
batch_indices: list[int] = get_batch_indices(netcdf_urls)
sorted(set(batch_indices), key=batch_indices.index)

# --- Construct a dictionary with a separate key for each batch ---
grouped: dict[int, list[Item]] = {}
for k, v in zip(batch_indices, items):
grouped.setdefault(k, []).append(v)

# --- Construct a STAC Catalog that holds multiple Items (which represent each TEMPO scan),
# and each Item holds multiple Assets (which represent each granule).
result.clear_items()

for batch_id, batch_items in grouped.items():
batch_urls: list[str] = _get_netcdf_urls(batch_items)
bounding_box = _get_output_bounding_box(batch_items)
properties = _get_output_date_range(batch_items)

# Construct a new pystac.Item with every granule in the batch as a pystac.Asset
output_item = Item(
str(uuid4()), bbox_to_geometry(bounding_box), bounding_box, None, properties
)

for idx, item in enumerate(batch_items):
output_item.add_asset(
"data",
Asset(
batch_urls[idx],
title=batch_urls[idx],
media_type="application/x-netcdf4",
roles=["data"],
),
)

result.add_item(output_item)

return result

except Exception as service_exception:
self.logger.error(service_exception, exc_info=1)
raise service_exception
98 changes: 98 additions & 0 deletions batcher/harmony/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Misc utility functions"""
from datetime import datetime

from pystac import Asset, Item

VALID_EXTENSIONS = (".nc4", ".nc")
VALID_MEDIA_TYPES = ["application/x-netcdf", "application/x-netcdf4"]


def _is_netcdf_asset(asset: Asset) -> bool:
"""Check that a `pystac.Asset` is a valid NetCDF-4 granule. This can be
ascertained via either the media type or by checking the extension of
granule itself if that media type is absent.

"""
return asset.media_type in VALID_MEDIA_TYPES or (
asset.media_type is None and asset.href.lower().endswith(VALID_EXTENSIONS)
)


def _get_item_url(item: Item) -> str | None:
"""Check the `pystac.Item` for the first asset with the `data` role and a
valid input format. If there are no matching assets, return None

"""
return next(
(
asset.href
for asset in item.assets.values()
if "data" in (asset.roles or []) and _is_netcdf_asset(asset)
),
None,
)


def _get_netcdf_urls(items: list[Item]) -> list[str]:
"""Iterate through a list of `pystac.Item` instances, from the input
`pystac.Catalog`. Extract the `pystac.Asset.href` for the first asset
of each item that has a role of "data". If there are any items that do
not have a data asset, then raise an exception.

"""
catalog_urls = [_get_item_url(item) for item in items]

if None in catalog_urls:
raise RuntimeError("Some input granules do not have NetCDF-4 assets.")

return catalog_urls # type: ignore[return-value]


def _get_output_bounding_box(input_items: list[Item]) -> list[float]:
"""Create a bounding box that is the maximum combined extent of all input
`pystac.Item` bounding box extents.

"""
bounding_box = input_items[0].bbox

for item in input_items:
bounding_box[0] = min(bounding_box[0], item.bbox[0])
bounding_box[1] = min(bounding_box[1], item.bbox[1])
bounding_box[2] = max(bounding_box[2], item.bbox[2])
bounding_box[3] = max(bounding_box[3], item.bbox[3])

return bounding_box


def _get_output_date_range(input_items: list[Item]) -> dict[str, str]:
"""Create a dictionary of start and end datetime, which encompasses the
full temporal range of all input `pystac.Item` instances. This output
dictionary will be used for the `properties` of the output Zarr store
`pystac.Item`.

"""
start_datetime, end_datetime = _get_item_date_range(input_items[0])

for item in input_items:
new_start_datetime, new_end_datetime = _get_item_date_range(item)
start_datetime = min(start_datetime, new_start_datetime)
end_datetime = max(end_datetime, new_end_datetime)

return {"start_datetime": start_datetime.isoformat(), "end_datetime": end_datetime.isoformat()}


def _get_item_date_range(item: Item) -> tuple[datetime, datetime]:
"""A helper function to retrieve the temporal range from a `pystac.Item`
instance. If the `pystac.Item.datetime` property exists, there is a
single datetime associated with the granule, otherwise there will be a
start and end time contained within the `pystac.Item` metadata.

"""
if item.datetime is None:
start_datetime = item.common_metadata.start_datetime
end_datetime = item.common_metadata.end_datetime
else:
start_datetime = item.datetime
end_datetime = item.datetime

return start_datetime, end_datetime
107 changes: 0 additions & 107 deletions batcher/harmony_adapter.py

This file was deleted.

2 changes: 1 addition & 1 deletion batcher/tempo_filename_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def get_batch_indices(filenames: list) -> list[int]:
return [batch_mapper[day_scan] for day_scan in day_and_scans]


def main():
def main() -> list[list[str]]:
"""Main CLI entrypoint"""

parser = ArgumentParser(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ruff = "^0.0.292"
coverage = "^7.3.2"

[tool.poetry.scripts]
batchee_harmony = 'batcher.harmony_cli:main'
batchee_harmony = 'batcher.harmony.cli:main'
batchee = 'batcher.tempo_filename_parser:main'

[build-system]
Expand Down

0 comments on commit dc49a85

Please sign in to comment.