Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/issue 6 #20

Merged
merged 4 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- [issue/13](https://github.com/danielfromearth/batchee/issues/13): Add simple command line interface for testing
- [issue/16](https://github.com/danielfromearth/batchee/issues/16): Add a logo
- [issue/6](https://github.com/danielfromearth/batchee/issues/6): Create Adapter code that processes a Harmony Message and STAC Catalog
### Changed
- [issue/11](https://github.com/danielfromearth/batchee/issues/11): Rename from concat_batcher to batchee
### Deprecated
Expand Down
Empty file added batcher/harmony/__init__.py
Empty file.
2 changes: 1 addition & 1 deletion batcher/harmony_cli.py → batcher/harmony/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import harmony

from .harmony_adapter import ConcatBatching as HarmonyAdapter
from batcher.harmony.service_adapter import ConcatBatching as HarmonyAdapter


def main(argv, **kwargs):
Expand Down
108 changes: 108 additions & 0 deletions batcher/harmony/service_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from uuid import uuid4

import pystac
from harmony.adapter import BaseHarmonyAdapter
from harmony.util import bbox_to_geometry
from pystac import Item
from pystac.item import Asset

from batcher.harmony.util import (
_get_netcdf_urls,
_get_output_bounding_box,
_get_output_date_range,
)
from batcher.tempo_filename_parser import get_batch_indices


class ConcatBatching(BaseHarmonyAdapter):
"""
A harmony-service-lib wrapper around the concatenate-batcher module.
This wrapper does not support Harmony calls that do not have STAC catalogs
as support for this behavior is being depreciated in harmony-service-lib
"""

def __init__(self, message, catalog=None, config=None):
"""
Constructs the adapter

Parameters
----------
message : harmony.Message
The Harmony input which needs acting upon
catalog : pystac.Catalog
A STAC catalog containing the files on which to act
config : harmony.util.Config
The configuration values for this runtime environment.
"""
super().__init__(message, catalog=catalog, config=config)

def invoke(self):
"""
Primary entrypoint into the service wrapper. Overrides BaseHarmonyAdapter.invoke
"""
if not self.catalog:
# Message-only support is being depreciated in Harmony, so we should expect to
# only see requests with catalogs when invoked with a newer Harmony instance
# https://github.com/nasa/harmony-service-lib-py/blob/21bcfbda17caf626fb14d2ac4f8673be9726b549/harmony/adapter.py#L71
raise RuntimeError("Invoking Batchee without a STAC catalog is not supported")

return self.message, self.process_catalog(self.catalog)

def process_catalog(self, catalog: pystac.Catalog):
"""Converts a list of STAC catalogs into a list of lists of STAC catalogs."""
try:
result = catalog.clone()
result.id = str(uuid4())
result.clear_children()

# Get all the items from the catalog, including from child or linked catalogs
items = list(self.get_all_catalog_items(catalog))

# Quick return if catalog contains no items
if len(items) == 0:
return result

# # --- Get granule filepaths (urls) ---
netcdf_urls: list[str] = _get_netcdf_urls(items)

# --- Map each granule to an index representing the batch to which it belongs ---
batch_indices: list[int] = get_batch_indices(netcdf_urls)
sorted(set(batch_indices), key=batch_indices.index)

# --- Construct a dictionary with a separate key for each batch ---
grouped: dict[int, list[Item]] = {}
for k, v in zip(batch_indices, items):
grouped.setdefault(k, []).append(v)

# --- Construct a STAC Catalog that holds multiple Items (which represent each TEMPO scan),
# and each Item holds multiple Assets (which represent each granule).
result.clear_items()

for batch_id, batch_items in grouped.items():
batch_urls: list[str] = _get_netcdf_urls(batch_items)
bounding_box = _get_output_bounding_box(batch_items)
properties = _get_output_date_range(batch_items)

# Construct a new pystac.Item with every granule in the batch as a pystac.Asset
output_item = Item(
str(uuid4()), bbox_to_geometry(bounding_box), bounding_box, None, properties
)

for idx, item in enumerate(batch_items):
output_item.add_asset(
"data",
Asset(
batch_urls[idx],
title=batch_urls[idx],
media_type="application/x-netcdf4",
roles=["data"],
),
)

result.add_item(output_item)

return result

except Exception as service_exception:
self.logger.error(service_exception, exc_info=1)
raise service_exception
98 changes: 98 additions & 0 deletions batcher/harmony/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Misc utility functions"""
from datetime import datetime

from pystac import Asset, Item

VALID_EXTENSIONS = (".nc4", ".nc")
VALID_MEDIA_TYPES = ["application/x-netcdf", "application/x-netcdf4"]


def _is_netcdf_asset(asset: Asset) -> bool:
"""Check that a `pystac.Asset` is a valid NetCDF-4 granule. This can be
ascertained via either the media type or by checking the extension of
granule itself if that media type is absent.

"""
return asset.media_type in VALID_MEDIA_TYPES or (
asset.media_type is None and asset.href.lower().endswith(VALID_EXTENSIONS)
)


def _get_item_url(item: Item) -> str | None:
"""Check the `pystac.Item` for the first asset with the `data` role and a
valid input format. If there are no matching assets, return None

"""
return next(
(
asset.href
for asset in item.assets.values()
if "data" in (asset.roles or []) and _is_netcdf_asset(asset)
),
None,
)


def _get_netcdf_urls(items: list[Item]) -> list[str]:
"""Iterate through a list of `pystac.Item` instances, from the input
`pystac.Catalog`. Extract the `pystac.Asset.href` for the first asset
of each item that has a role of "data". If there are any items that do
not have a data asset, then raise an exception.

"""
catalog_urls = [_get_item_url(item) for item in items]

if None in catalog_urls:
raise RuntimeError("Some input granules do not have NetCDF-4 assets.")

return catalog_urls # type: ignore[return-value]


def _get_output_bounding_box(input_items: list[Item]) -> list[float]:
"""Create a bounding box that is the maximum combined extent of all input
`pystac.Item` bounding box extents.

"""
bounding_box = input_items[0].bbox

for item in input_items:
bounding_box[0] = min(bounding_box[0], item.bbox[0])
bounding_box[1] = min(bounding_box[1], item.bbox[1])
bounding_box[2] = max(bounding_box[2], item.bbox[2])
bounding_box[3] = max(bounding_box[3], item.bbox[3])

return bounding_box


def _get_output_date_range(input_items: list[Item]) -> dict[str, str]:
"""Create a dictionary of start and end datetime, which encompasses the
full temporal range of all input `pystac.Item` instances. This output
dictionary will be used for the `properties` of the output Zarr store
`pystac.Item`.

"""
start_datetime, end_datetime = _get_item_date_range(input_items[0])

for item in input_items:
new_start_datetime, new_end_datetime = _get_item_date_range(item)
start_datetime = min(start_datetime, new_start_datetime)
end_datetime = max(end_datetime, new_end_datetime)

return {"start_datetime": start_datetime.isoformat(), "end_datetime": end_datetime.isoformat()}


def _get_item_date_range(item: Item) -> tuple[datetime, datetime]:
"""A helper function to retrieve the temporal range from a `pystac.Item`
instance. If the `pystac.Item.datetime` property exists, there is a
single datetime associated with the granule, otherwise there will be a
start and end time contained within the `pystac.Item` metadata.

"""
if item.datetime is None:
start_datetime = item.common_metadata.start_datetime
end_datetime = item.common_metadata.end_datetime
else:
start_datetime = item.datetime
end_datetime = item.datetime

return start_datetime, end_datetime
104 changes: 0 additions & 104 deletions batcher/harmony_adapter.py

This file was deleted.

14 changes: 7 additions & 7 deletions batcher/tempo_filename_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
)


def get_unique_day_scan_categories(filenames: list) -> list[int]:
def get_batch_indices(filenames: list) -> list[int]:
"""
Returns
-------
list[int]
category integer for each filename in the original list, e.g. [0, 0, 0, 1, 1, 1, ...]
batch index for each filename in the original list, e.g. [0, 0, 0, 1, 1, 1, ...]
"""
# Make a new list with days and scans, e.g. [('20130701', 'S009'), ('20130701', 'S009'), ...]
day_and_scans: list[tuple[str, str]] = []
Expand All @@ -32,19 +32,19 @@ def get_unique_day_scan_categories(filenames: list) -> list[int]:
match_dict = matches.groupdict()
day_and_scans.append((match_dict["day_in_granule"], match_dict["daily_scan_id"]))

# Unique categories are determined, while keeping the same order
# Unique day-scans are determined (while keeping the same order). Each will be its own batch.
unique_day_scans: list[tuple[str, str]] = sorted(set(day_and_scans), key=day_and_scans.index)

# Map each day/scan to an integer
category_mapper: dict[tuple[str, str], int] = {
batch_mapper: dict[tuple[str, str], int] = {
day_scan: idx for idx, day_scan in enumerate(unique_day_scans)
}

# Generate a new list with the integer representation for each entry in the original list
return [category_mapper[day_scan] for day_scan in day_and_scans]
return [batch_mapper[day_scan] for day_scan in day_and_scans]


def main():
def main() -> list[list[str]]:
"""Main CLI entrypoint"""

parser = ArgumentParser(
Expand All @@ -69,7 +69,7 @@ def main():

input_filenames = [str(Path(f).resolve()) for f in args.file_names]

batch_indices = get_unique_day_scan_categories(input_filenames)
batch_indices = get_batch_indices(input_filenames)
unique_category_indices: list[int] = sorted(set(batch_indices), key=batch_indices.index)
logging.info(f"batch_indices = {batch_indices}")

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ruff = "^0.0.292"
coverage = "^7.3.2"

[tool.poetry.scripts]
batchee_harmony = 'batcher.harmony_cli:main'
batchee_harmony = 'batcher.harmony.cli:main'
batchee = 'batcher.tempo_filename_parser:main'

[build-system]
Expand Down
Loading