Skip to content

Commit

Permalink
change output of process_catalogs from single Catalog to list of Cata…
Browse files Browse the repository at this point in the history
…logs
  • Loading branch information
danielfromearth committed Nov 8, 2023
1 parent 6095e05 commit ca6f485
Showing 1 changed file with 29 additions and 25 deletions.
54 changes: 29 additions & 25 deletions batcher/harmony/service_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from pystac.item import Asset

from batcher.harmony.util import (
_get_item_url,
_get_netcdf_urls,
_get_output_bounding_box,
_get_output_date_range,
)
from batcher.tempo_filename_parser import get_batch_indices
Expand Down Expand Up @@ -48,21 +48,20 @@ def invoke(self):

return self.message, self.process_catalog(self.catalog)

def process_catalog(self, catalog: pystac.Catalog):
def process_catalog(self, catalog: pystac.Catalog) -> list[pystac.Catalog]:
"""Converts a list of STAC catalogs into a list of lists of STAC catalogs."""
self.logger.info("process_catalog() started.")
try:
result = catalog.clone()
result.id = str(uuid4())
result.clear_children()

# Get all the items from the catalog, including from child or linked catalogs
items = list(self.get_all_catalog_items(catalog))

self.logger.info(f"length of items==={len(items)}.")

# Quick return if catalog contains no items
if len(items) == 0:
result = catalog.clone()
result.id = str(uuid4())
result.clear_children()
return result

# # --- Get granule filepaths (urls) ---
Expand All @@ -79,38 +78,43 @@ def process_catalog(self, catalog: pystac.Catalog):
for k, v in zip(batch_indices, items):
grouped.setdefault(k, []).append(v)

# --- Construct a STAC Catalog that holds multiple Items (which represent each TEMPO scan),
# and each Item holds multiple Assets (which represent each granule).
result.clear_items()

# --- Construct a list of STAC Catalogs (which represent each TEMPO scan),
# and each Catalog holds multiple Items (which represent each granule).
catalogs = []
for batch_id, batch_items in grouped.items():
batch_urls: list[str] = _get_netcdf_urls(batch_items)
bounding_box = _get_output_bounding_box(batch_items)
properties = _get_output_date_range(batch_items)

self.logger.info(f"constructing new pystac.Item for batch_id==={batch_id}.")

# Construct a new pystac.Item with every granule in the batch as a pystac.Asset
output_item = Item(
str(uuid4()), bbox_to_geometry(bounding_box), bounding_box, None, properties
)
self.logger.info(f"constructing new pystac.Catalog for batch_id==={batch_id}.")
# Initialize a new, empty Catalog
batch_catalog = catalog.clone()
batch_catalog.id = str(uuid4())
batch_catalog.clear_children()
batch_catalog.clear_items()

for idx, item in enumerate(batch_items):
# Construct a new pystac.Item for each granule in the batch
output_item = Item(
str(uuid4()),
bbox_to_geometry(item.bbox),
item.bbox,
None,
_get_output_date_range([item]),
)
output_item.add_asset(
f"data_{idx}",
Asset(
batch_urls[idx],
title=batch_urls[idx],
_get_item_url(item),
title=_get_item_url(item),
media_type="application/x-netcdf4",
roles=["data"],
),
)
batch_catalog.add_item(output_item)

result.add_item(output_item)
self.logger.info("STAC catalog creation for batch_id==={batch_id} complete.")
catalogs.append(batch_catalog)

self.logger.info("STAC catalog creation complete.")
self.logger.info("All STAC catalogs are complete.")

return result
return catalogs

except Exception as service_exception:
self.logger.error(service_exception, exc_info=1)
Expand Down

0 comments on commit ca6f485

Please sign in to comment.