Skip to content

Commit

Permalink
Feature/issue 222 - Add granule info to track ingest table on load (#223
Browse files Browse the repository at this point in the history
)

* adjust lambdas to populate track ingest table on granule load

* changelog

* remove test cnm

* lint

* change error caught when handling checksum

* update lambda role permissions to write to track ingest table

* fix typo on lake table terraform

* set default fill values for checksum and rev date in track status

* fix checksum handling in bulk load data

* lint

* add logging to debug
  • Loading branch information
torimcd authored Aug 19, 2024
1 parent 5468664 commit 70613e5
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Issue 222 - Add operations to load granule Lambda to write granule record to track ingest database
- Issue 201 - Create table for tracking granule ingest status
- Issue 198 - Implement track ingest lambda function CMR and Hydrocron queries
- Issue 193 - Add new Dynamo table for prior lake data
Expand Down
49 changes: 49 additions & 0 deletions hydrocron/db/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,24 @@ def lambda_handler(event, _): # noqa: E501 # pylint: disable=W0613

for granule in new_granules:
granule_path = granule.data_links(access='direct')[0]
logging.info('Granule: %s', granule_path)
try:
checksum = granule['umm']['Checksum']['Value']
except KeyError:
checksum = "Not Found"
logging.info('No UMM checksum')

try:
revision_date = [date["Date"] for date in granule["umm"]["ProviderDates"] if "Update" in date["Type"]]
except KeyError:
revision_date = "Not Found"
logging.info('No UMM revision date')

if feature_type in granule_path:
event2 = ('{"body": {"granule_path": "' + granule_path
+ '","table_name": "' + table_name
+ '","checksum": "' + checksum
+ '","revisionDate": "' + revision_date
+ '","load_benchmarking_data": "' + load_benchmarking_data + '"}}')

logging.info("Invoking granule load lambda with event json %s", str(event2))
Expand All @@ -92,6 +106,18 @@ def granule_handler(event, _):

load_benchmarking_data = event['body']['load_benchmarking_data']

try:
checksum = event['body']['checksum']
except KeyError:
checksum = "Not Found"
logging.info('No CNM checksum')

try:
revision_date = event['body']['revisionDate']
except KeyError:
revision_date = "Not Found"
logging.info('No CNM revision date')

if ("Reach" in granule_path) & (table_name != constants.SWOT_REACH_TABLE_NAME):
raise TableMisMatch(f"Error: Cannot load Reach data into table: '{table_name}'")

Expand All @@ -118,6 +144,18 @@ def granule_handler(event, _):

logging.info("Set up dynamo connection")
dynamo_resource = connection.dynamodb_resource

logging.info("Adding granule to track ingest table")
track_ingest_record = [{
"granuleUR": os.path.basename(granule_path),
"revision_date": revision_date,
"expected_feature_count": len(items),
"actual_feature_count": 0,
"checksum": checksum,
"status": "to_ingest"
}]
load_data(dynamo_resource, table_name=constants.TRACK_INGEST_TABLE_NAME, items=track_ingest_record)

logging.info("Begin loading data from granule: %s", os.path.basename(granule_path))
load_data(dynamo_resource, table_name, items)

Expand All @@ -133,16 +171,20 @@ def cnm_handler(event, _):
# Parse message
for message in event['Records']:
cnm = json.loads(message['Sns']['Message'])
revision_date = cnm['submissionTime']

logging.info("Begin processing message %s", str(cnm))

for files in cnm['product']['files']:
if files['type'] == 'data':
granule_uri = files['uri']
checksum = files['checksum']

if 'Reach' in granule_uri:
event2 = ('{"body": {"granule_path": "' + granule_uri
+ '","table_name": "' + constants.SWOT_REACH_TABLE_NAME
+ '","checksum": "' + checksum
+ '","revisionDate": "' + revision_date
+ '","load_benchmarking_data": "' + load_benchmarking_data + '"}}')

logging.info("Invoking granule load lambda with event json %s", str(event2))
Expand All @@ -155,6 +197,8 @@ def cnm_handler(event, _):
if 'Node' in granule_uri:
event2 = ('{"body": {"granule_path": "' + granule_uri
+ '","table_name": "' + constants.SWOT_NODE_TABLE_NAME
+ '","checksum": "' + checksum
+ '","revisionDate": "' + revision_date
+ '","load_benchmarking_data": "' + load_benchmarking_data + '"}}')

logging.info("Invoking granule load lambda with event json %s", str(event2))
Expand All @@ -167,6 +211,8 @@ def cnm_handler(event, _):
if 'LakeSP_Prior' in granule_uri:
event2 = ('{"body": {"granule_path": "' + granule_uri
+ '","table_name": "' + constants.SWOT_PRIOR_LAKE_TABLE_NAME
+ '","checksum": "' + checksum
+ '","revisionDate": "' + revision_date
+ '","load_benchmarking_data": "' + load_benchmarking_data + '"}}')

logging.info("Invoking granule load lambda with event json %s", str(event2))
Expand Down Expand Up @@ -283,6 +329,9 @@ def load_data(dynamo_resource, table_name, items):
case constants.SWOT_PRIOR_LAKE_TABLE_NAME:
feature_name = 'prior_lake'
feature_id = 'lake_id'
case constants.TRACK_INGEST_TABLE_NAME:
feature_name = 'track ingest'
feature_id = 'granuleUR'
case _:
logging.warning('Items cannot be parsed, file reader not implemented for table %s', hydrocron_table.table_name)

Expand Down
1 change: 1 addition & 0 deletions hydrocron/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
SWOT_REACH_TABLE_NAME = "hydrocron-swot-reach-table"
SWOT_NODE_TABLE_NAME = "hydrocron-swot-node-table"
SWOT_PRIOR_LAKE_TABLE_NAME = "hydrocron-swot-prior-lake-table"
TRACK_INGEST_TABLE_NAME = "hydrocron-track-ingest-table"

SWOT_REACH_COLLECTION_NAME = "SWOT_L2_HR_RiverSP_2.0"
SWOT_NODE_COLLECTION_NAME = "SWOT_L2_HR_RiverSP_2.0"
Expand Down
8 changes: 7 additions & 1 deletion terraform/hydrocron-iam.tf
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ data "aws_iam_policy_document" "dynamo-read-policy" {
aws_dynamodb_table.hydrocron-swot-node-table.arn,
"${aws_dynamodb_table.hydrocron-swot-node-table.arn}/index/*",
aws_dynamodb_table.hydrocron-swot-reach-table.arn,
"${aws_dynamodb_table.hydrocron-swot-reach-table.arn}/index/*"
"${aws_dynamodb_table.hydrocron-swot-reach-table.arn}/index/*",
aws_dynamodb_table.hydrocron-swot-prior-lake-table.arn,
"${aws_dynamodb_table.hydrocron-swot-prior-lake-table.arn}/index/*"
]
}

Expand Down Expand Up @@ -381,6 +383,10 @@ resource "aws_iam_role" "hydrocron-lambda-load-granule-role" {
name = "HydrocronDynamoWrite"
policy = data.aws_iam_policy_document.dynamo-write-policy.json
}
inline_policy {
name = "HydrocronTrackIngestDynamoWrite"
policy = data.aws_iam_policy_document.dynamo-write-policy-track-ingest.json
}
inline_policy {
name = "HydrocronS3Read"
policy = data.aws_iam_policy_document.s3-read-policy.json
Expand Down

0 comments on commit 70613e5

Please sign in to comment.