diff --git a/CHANGELOG.md b/CHANGELOG.md index bd850fc..a5d4133 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added + - Issue 222 - Add operations to load granule Lambda to write granule record to track ingest database - Issue 201 - Create table for tracking granule ingest status - Issue 198 - Implement track ingest lambda function CMR and Hydrocron queries - Issue 193 - Add new Dynamo table for prior lake data diff --git a/hydrocron/db/load_data.py b/hydrocron/db/load_data.py index 257204a..867c68d 100755 --- a/hydrocron/db/load_data.py +++ b/hydrocron/db/load_data.py @@ -69,10 +69,24 @@ def lambda_handler(event, _): # noqa: E501 # pylint: disable=W0613 for granule in new_granules: granule_path = granule.data_links(access='direct')[0] + logging.info('Granule: %s', granule_path) + try: + checksum = granule['umm']['Checksum']['Value'] + except KeyError: + checksum = "Not Found" + logging.info('No UMM checksum') + + try: + revision_date = [date["Date"] for date in granule["umm"]["ProviderDates"] if "Update" in date["Type"]] + except KeyError: + revision_date = "Not Found" + logging.info('No UMM revision date') if feature_type in granule_path: event2 = ('{"body": {"granule_path": "' + granule_path + '","table_name": "' + table_name + + '","checksum": "' + checksum + + '","revisionDate": "' + revision_date + '","load_benchmarking_data": "' + load_benchmarking_data + '"}}') logging.info("Invoking granule load lambda with event json %s", str(event2)) @@ -92,6 +106,18 @@ def granule_handler(event, _): load_benchmarking_data = event['body']['load_benchmarking_data'] + try: + checksum = event['body']['checksum'] + except KeyError: + checksum = "Not Found" + logging.info('No CNM checksum') + + try: + revision_date = event['body']['revisionDate'] + except KeyError: + revision_date = "Not Found" + logging.info('No CNM revision date') + if ("Reach" in granule_path) & (table_name != constants.SWOT_REACH_TABLE_NAME): raise TableMisMatch(f"Error: Cannot load Reach data into table: '{table_name}'") @@ -118,6 +144,18 @@ def granule_handler(event, _): logging.info("Set up dynamo connection") dynamo_resource = connection.dynamodb_resource + + logging.info("Adding granule to track ingest table") + track_ingest_record = [{ + "granuleUR": os.path.basename(granule_path), + "revision_date": revision_date, + "expected_feature_count": len(items), + "actual_feature_count": 0, + "checksum": checksum, + "status": "to_ingest" + }] + load_data(dynamo_resource, table_name=constants.TRACK_INGEST_TABLE_NAME, items=track_ingest_record) + logging.info("Begin loading data from granule: %s", os.path.basename(granule_path)) load_data(dynamo_resource, table_name, items) @@ -133,16 +171,20 @@ def cnm_handler(event, _): # Parse message for message in event['Records']: cnm = json.loads(message['Sns']['Message']) + revision_date = cnm['submissionTime'] logging.info("Begin processing message %s", str(cnm)) for files in cnm['product']['files']: if files['type'] == 'data': granule_uri = files['uri'] + checksum = files['checksum'] if 'Reach' in granule_uri: event2 = ('{"body": {"granule_path": "' + granule_uri + '","table_name": "' + constants.SWOT_REACH_TABLE_NAME + + '","checksum": "' + checksum + + '","revisionDate": "' + revision_date + '","load_benchmarking_data": "' + load_benchmarking_data + '"}}') logging.info("Invoking granule load lambda with event json %s", str(event2)) @@ -155,6 +197,8 @@ def cnm_handler(event, _): if 'Node' in granule_uri: event2 = ('{"body": {"granule_path": "' + granule_uri + '","table_name": "' + constants.SWOT_NODE_TABLE_NAME + + '","checksum": "' + checksum + + '","revisionDate": "' + revision_date + '","load_benchmarking_data": "' + load_benchmarking_data + '"}}') logging.info("Invoking granule load lambda with event json %s", str(event2)) @@ -167,6 +211,8 @@ def cnm_handler(event, _): if 'LakeSP_Prior' in granule_uri: event2 = ('{"body": {"granule_path": "' + granule_uri + '","table_name": "' + constants.SWOT_PRIOR_LAKE_TABLE_NAME + + '","checksum": "' + checksum + + '","revisionDate": "' + revision_date + '","load_benchmarking_data": "' + load_benchmarking_data + '"}}') logging.info("Invoking granule load lambda with event json %s", str(event2)) @@ -283,6 +329,9 @@ def load_data(dynamo_resource, table_name, items): case constants.SWOT_PRIOR_LAKE_TABLE_NAME: feature_name = 'prior_lake' feature_id = 'lake_id' + case constants.TRACK_INGEST_TABLE_NAME: + feature_name = 'track ingest' + feature_id = 'granuleUR' case _: logging.warning('Items cannot be parsed, file reader not implemented for table %s', hydrocron_table.table_name) diff --git a/hydrocron/utils/constants.py b/hydrocron/utils/constants.py index 93cc789..25f86bd 100644 --- a/hydrocron/utils/constants.py +++ b/hydrocron/utils/constants.py @@ -128,6 +128,7 @@ SWOT_REACH_TABLE_NAME = "hydrocron-swot-reach-table" SWOT_NODE_TABLE_NAME = "hydrocron-swot-node-table" SWOT_PRIOR_LAKE_TABLE_NAME = "hydrocron-swot-prior-lake-table" +TRACK_INGEST_TABLE_NAME = "hydrocron-track-ingest-table" SWOT_REACH_COLLECTION_NAME = "SWOT_L2_HR_RiverSP_2.0" SWOT_NODE_COLLECTION_NAME = "SWOT_L2_HR_RiverSP_2.0" diff --git a/terraform/hydrocron-iam.tf b/terraform/hydrocron-iam.tf index a74d821..3dab84e 100644 --- a/terraform/hydrocron-iam.tf +++ b/terraform/hydrocron-iam.tf @@ -44,7 +44,9 @@ data "aws_iam_policy_document" "dynamo-read-policy" { aws_dynamodb_table.hydrocron-swot-node-table.arn, "${aws_dynamodb_table.hydrocron-swot-node-table.arn}/index/*", aws_dynamodb_table.hydrocron-swot-reach-table.arn, - "${aws_dynamodb_table.hydrocron-swot-reach-table.arn}/index/*" + "${aws_dynamodb_table.hydrocron-swot-reach-table.arn}/index/*", + aws_dynamodb_table.hydrocron-swot-prior-lake-table.arn, + "${aws_dynamodb_table.hydrocron-swot-prior-lake-table.arn}/index/*" ] } @@ -381,6 +383,10 @@ resource "aws_iam_role" "hydrocron-lambda-load-granule-role" { name = "HydrocronDynamoWrite" policy = data.aws_iam_policy_document.dynamo-write-policy.json } + inline_policy { + name = "HydrocronTrackIngestDynamoWrite" + policy = data.aws_iam_policy_document.dynamo-write-policy-track-ingest.json + } inline_policy { name = "HydrocronS3Read" policy = data.aws_iam_policy_document.s3-read-policy.json