diff --git a/CHANGELOG.md b/CHANGELOG.md index a5d4133..e9aadb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added + - Issue 225 - Create one track ingest table per feature type - Issue 222 - Add operations to load granule Lambda to write granule record to track ingest database - Issue 201 - Create table for tracking granule ingest status - Issue 198 - Implement track ingest lambda function CMR and Hydrocron queries diff --git a/hydrocron/db/load_data.py b/hydrocron/db/load_data.py index 867c68d..b9d8e33 100755 --- a/hydrocron/db/load_data.py +++ b/hydrocron/db/load_data.py @@ -45,15 +45,19 @@ def lambda_handler(event, _): # noqa: E501 # pylint: disable=W0613 match table_name: case constants.SWOT_REACH_TABLE_NAME: collection_shortname = constants.SWOT_REACH_COLLECTION_NAME + track_table = constants.SWOT_REACH_TRACK_INGEST_TABLE_NAME feature_type = 'Reach' case constants.SWOT_NODE_TABLE_NAME: collection_shortname = constants.SWOT_NODE_COLLECTION_NAME + track_table = constants.SWOT_NODE_TRACK_INGEST_TABLE_NAME feature_type = 'Node' case constants.SWOT_PRIOR_LAKE_TABLE_NAME: collection_shortname = constants.SWOT_PRIOR_LAKE_COLLECTION_NAME + track_table = constants.SWOT_PRIOR_LAKE_TRACK_INGEST_TABLE_NAME feature_type = 'LakeSP_prior' case constants.DB_TEST_TABLE_NAME: collection_shortname = constants.SWOT_REACH_COLLECTION_NAME + track_table = constants.SWOT_REACH_TRACK_INGEST_TABLE_NAME feature_type = 'Reach' case _: raise MissingTable(f"Hydrocron table '{table_name}' does not exist.") @@ -85,6 +89,7 @@ def lambda_handler(event, _): # noqa: E501 # pylint: disable=W0613 if feature_type in granule_path: event2 = ('{"body": {"granule_path": "' + granule_path + '","table_name": "' + table_name + + '","track_table": "' + track_table + '","checksum": "' + checksum + '","revisionDate": "' + revision_date + '","load_benchmarking_data": "' + load_benchmarking_data + '"}}') @@ -103,6 +108,7 @@ def granule_handler(event, _): """ granule_path = event['body']['granule_path'] table_name = event['body']['table_name'] + track_table = event['body']['track_table'] load_benchmarking_data = event['body']['load_benchmarking_data'] @@ -154,7 +160,7 @@ def granule_handler(event, _): "checksum": checksum, "status": "to_ingest" }] - load_data(dynamo_resource, table_name=constants.TRACK_INGEST_TABLE_NAME, items=track_ingest_record) + load_data(dynamo_resource, table_name=track_table, items=track_ingest_record) logging.info("Begin loading data from granule: %s", os.path.basename(granule_path)) load_data(dynamo_resource, table_name, items) @@ -183,6 +189,7 @@ def cnm_handler(event, _): if 'Reach' in granule_uri: event2 = ('{"body": {"granule_path": "' + granule_uri + '","table_name": "' + constants.SWOT_REACH_TABLE_NAME + + '","track_table": "' + constants.SWOT_REACH_TRACK_INGEST_TABLE_NAME + '","checksum": "' + checksum + '","revisionDate": "' + revision_date + '","load_benchmarking_data": "' + load_benchmarking_data + '"}}') @@ -197,6 +204,7 @@ def cnm_handler(event, _): if 'Node' in granule_uri: event2 = ('{"body": {"granule_path": "' + granule_uri + '","table_name": "' + constants.SWOT_NODE_TABLE_NAME + + '","track_table": "' + constants.SWOT_NODE_TRACK_INGEST_TABLE_NAME + '","checksum": "' + checksum + '","revisionDate": "' + revision_date + '","load_benchmarking_data": "' + load_benchmarking_data + '"}}') @@ -211,6 +219,7 @@ def cnm_handler(event, _): if 'LakeSP_Prior' in granule_uri: event2 = ('{"body": {"granule_path": "' + granule_uri + '","table_name": "' + constants.SWOT_PRIOR_LAKE_TABLE_NAME + + '","track_table": "' + constants.SWOT_PRIOR_LAKE_TRACK_INGEST_TABLE_NAME + '","checksum": "' + checksum + '","revisionDate": "' + revision_date + '","load_benchmarking_data": "' + load_benchmarking_data + '"}}') @@ -329,8 +338,14 @@ def load_data(dynamo_resource, table_name, items): case constants.SWOT_PRIOR_LAKE_TABLE_NAME: feature_name = 'prior_lake' feature_id = 'lake_id' - case constants.TRACK_INGEST_TABLE_NAME: - feature_name = 'track ingest' + case constants.SWOT_REACH_TRACK_INGEST_TABLE_NAME: + feature_name = 'track ingest reaches' + feature_id = 'granuleUR' + case constants.SWOT_NODE_TRACK_INGEST_TABLE_NAME: + feature_name = 'track ingest nodes' + feature_id = 'granuleUR' + case constants.SWOT_PRIOR_LAKE_TRACK_INGEST_TABLE_NAME: + feature_name = 'track ingest prior lakes' feature_id = 'granuleUR' case _: logging.warning('Items cannot be parsed, file reader not implemented for table %s', hydrocron_table.table_name) diff --git a/hydrocron/utils/constants.py b/hydrocron/utils/constants.py index 25f86bd..411acc2 100644 --- a/hydrocron/utils/constants.py +++ b/hydrocron/utils/constants.py @@ -128,7 +128,9 @@ SWOT_REACH_TABLE_NAME = "hydrocron-swot-reach-table" SWOT_NODE_TABLE_NAME = "hydrocron-swot-node-table" SWOT_PRIOR_LAKE_TABLE_NAME = "hydrocron-swot-prior-lake-table" -TRACK_INGEST_TABLE_NAME = "hydrocron-track-ingest-table" +SWOT_REACH_TRACK_INGEST_TABLE_NAME = "hydrocron-swot-reach-track-ingest-table" +SWOT_NODE_TRACK_INGEST_TABLE_NAME = "hydrocron-swot-node-track-ingest-table" +SWOT_PRIOR_LAKE_TRACK_INGEST_TABLE_NAME = "hydrocron-swot-prior-lake-track-ingest-table" SWOT_REACH_COLLECTION_NAME = "SWOT_L2_HR_RiverSP_2.0" SWOT_NODE_COLLECTION_NAME = "SWOT_L2_HR_RiverSP_2.0" diff --git a/terraform/hydrocron-dynamo.tf b/terraform/hydrocron-dynamo.tf index 333d196..fdae86c 100644 --- a/terraform/hydrocron-dynamo.tf +++ b/terraform/hydrocron-dynamo.tf @@ -87,8 +87,62 @@ resource "aws_dynamodb_table" "hydrocron-swot-prior-lake-table" { } } -resource "aws_dynamodb_table" "hydrocron-track-ingest-table" { - name = "hydrocron-track-ingest-table" +resource "aws_dynamodb_table" "hydrocron-reach-track-ingest-table" { + name = "hydrocron-swot-reach-track-ingest-table" + billing_mode = "PAY_PER_REQUEST" + hash_key = "granuleUR" + range_key = "revision_date" + attribute { + name = "granuleUR" + type = "S" + } + attribute { + name = "revision_date" + type = "S" + } + attribute { + name = "status" + type = "S" + } + global_secondary_index { + name = "statusIndex" + hash_key = "status" + projection_type = "ALL" + } + point_in_time_recovery { + enabled = var.stage == "ops" ? true : false + } +} + +resource "aws_dynamodb_table" "hydrocron-node-track-ingest-table" { + name = "hydrocron-swot-node-track-ingest-table" + billing_mode = "PAY_PER_REQUEST" + hash_key = "granuleUR" + range_key = "revision_date" + attribute { + name = "granuleUR" + type = "S" + } + attribute { + name = "revision_date" + type = "S" + } + attribute { + name = "status" + type = "S" + } + global_secondary_index { + name = "statusIndex" + hash_key = "status" + projection_type = "ALL" + } + point_in_time_recovery { + enabled = var.stage == "ops" ? true : false + } +} + +resource "aws_dynamodb_table" "hydrocron-priorlake-track-ingest-table" { + name = "hydrocron-swot-prior-lake-track-ingest-table" billing_mode = "PAY_PER_REQUEST" hash_key = "granuleUR" range_key = "revision_date" diff --git a/terraform/hydrocron-iam.tf b/terraform/hydrocron-iam.tf index 3dab84e..bbee64e 100644 --- a/terraform/hydrocron-iam.tf +++ b/terraform/hydrocron-iam.tf @@ -67,7 +67,9 @@ data "aws_iam_policy_document" "dynamo-read-policy-track-ingest" { ] resources = [ - aws_dynamodb_table.hydrocron-track-ingest-table.arn, + aws_dynamodb_table.hydrocron-reach-track-ingest-table.arn, + aws_dynamodb_table.hydrocron-node-track-ingest-table.arn, + aws_dynamodb_table.hydrocron-priorlake-track-ingest-table.arn, ] } @@ -119,7 +121,9 @@ data "aws_iam_policy_document" "dynamo-write-policy-track-ingest" { ] resources = [ - aws_dynamodb_table.hydrocron-track-ingest-table.arn + aws_dynamodb_table.hydrocron-reach-track-ingest-table.arn, + aws_dynamodb_table.hydrocron-node-track-ingest-table.arn, + aws_dynamodb_table.hydrocron-priorlake-track-ingest-table.arn, ] }