Skip to content

Commit

Permalink
Merge pull request #90 from planetlabs/fix-store-latest
Browse files Browse the repository at this point in the history
Fix store latest
  • Loading branch information
ABPLMC authored Jun 28, 2024
2 parents 0e33ed6 + f3cdb7d commit 8771eae
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 19 deletions.
35 changes: 18 additions & 17 deletions ingester/datalake_ingester/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,19 @@
class DynamoDBStorage(object):
'''store datalake records in a dynamoDB table'''

def __init__(self, table_name=None, latest_table=None, connection=None):
def __init__(self, table_name=None, latest_table_name=None, connection=None):
self.table_name = table_name
self.latest_table_name = os.environ.get("DATALAKE_LATEST_TABLE",
f"{latest_table}")
self.use_latest = os.environ.get("DATALAKE_USE_LATEST_TABLE", False)
self.latest_table_name = latest_table_name
self._prepare_connection(connection)
self.logger = logging.getLogger('storage')

@classmethod
def from_config(cls):
table_name = os.environ.get('DATALAKE_DYNAMODB_TABLE')
latest_table_name = os.environ.get("DATALAKE_LATEST_TABLE")
if table_name is None:
raise InsufficientConfiguration('Please specify a dynamodb table')
return cls(table_name)
return cls(table_name, latest_table_name)

def _prepare_connection(self, connection):
region = os.environ.get('AWS_REGION')
Expand All @@ -59,14 +58,13 @@ def _latest_table(self):
return Table(self.latest_table_name, connection=self._connection)

def store(self, record):
if self.use_latest:
self._latest_table.store_latest(record)
else:
try:
self._table.put_item(data=record)
except ConditionalCheckFailedException:
# Tolerate duplicate stores
pass
try:
self._table.put_item(data=record)
except ConditionalCheckFailedException:
# Tolerate duplicate stores
pass
if self.latest_table_name:
self.store_latest(record)

def update(self, record):
self._table.put_item(data=record, overwrite=True)
Expand All @@ -76,11 +74,16 @@ def store_latest(self, record):
note: Record must utilize AttributeValue syntax
for the conditional put.
"""

condition_expression = " attribute_not_exists(what_where_key) OR metadata.start < :new_start"
expression_attribute_values = {
':new_start': {'N': str(record['metadata']['start'])}
}

if record['metadata']['work_id'] is None:
work_id_value = {'NULL': True}
else:
work_id_value = {'S': str(record['metadata']['work_id'])}

record = {
'what_where_key': {"S": record['metadata']['what']+':'+record['metadata']['where']},
'time_index_key': {"S": record['time_index_key']},
Expand Down Expand Up @@ -111,9 +114,7 @@ def store_latest(self, record):
'where': {
'S': str(record['metadata']['where'])
},
'work_id': {
'S': str(record['metadata']['work_id'])
}
'work_id': work_id_value
}
},
'url': {"S": record['url']},
Expand Down
15 changes: 15 additions & 0 deletions ingester/tests/test_ingester.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
import time
import decimal
from decimal import Decimal
import json

Expand Down Expand Up @@ -32,6 +33,20 @@ def test_ingest_random(storage, dynamodb_records_table, random_s3_file_maker):
for r in records:
assert r['metadata'] == metadata

def test_ingest_random_latest(storage, dynamodb_latest_table, random_s3_file_maker):
storage.latest_table_name = 'latest'
url, metadata = random_s3_file_maker()
ingester = Ingester(storage)
ingester.ingest(url)
records = [dict(r) for r in dynamodb_latest_table.scan()]
def convert_records(records):
return {k: (decimal.Decimal(str(v)) if isinstance(v, (int, float)) else v) for k, v in records[0].items()}

converted_records = convert_records(records)
assert len(records) >= 1
for r in records:
assert r['metadata'] == converted_records['metadata']


def test_ingest_no_end(storage, dynamodb_records_table, s3_file_from_metadata,
random_metadata):
Expand Down
6 changes: 4 additions & 2 deletions ingester/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ def test_store_duplicate(dynamodb_users_table, dynamodb_connection):
assert dict(user) == expected_user

def test_insert_new_record(dynamodb_latest_table, dynamodb_connection):
storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection)
storage = DynamoDBStorage(connection=dynamodb_connection)
storage.latest_table_name = 'latest'

new_record = {
'what_where_key': 'syslog:ground_server2',
Expand Down Expand Up @@ -51,7 +52,8 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection):


def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dynamodb_connection):
storage = DynamoDBStorage(latest_table='latest', connection=dynamodb_connection)
storage = DynamoDBStorage(connection=dynamodb_connection)
storage.latest_table_name = 'latest'

file1 = {
'what_where_key': 'syslog:ground_server2',
Expand Down

0 comments on commit 8771eae

Please sign in to comment.