diff --git a/api/datalake_api/querier.py b/api/datalake_api/querier.py index fd8a51c..49516ff 100644 --- a/api/datalake_api/querier.py +++ b/api/datalake_api/querier.py @@ -18,7 +18,6 @@ import base64 import json import time -import os from datetime import datetime, timedelta import decimal diff --git a/api/tests/test_archive_querier.py b/api/tests/test_archive_querier.py index 3dc84c9..ea51033 100644 --- a/api/tests/test_archive_querier.py +++ b/api/tests/test_archive_querier.py @@ -606,7 +606,6 @@ def test_query_latest_future_record_exceeds_lookforward(table_maker, querier, re record = record_maker(what='meow', where='tree', start=future_start, end=future_end) default_table, latest_table = table_maker([]) - print(default_table.__dict__, type(default_table)) default_table.put_item(Item=record[0]) latest_table.put_item(Item=record[0]) diff --git a/ingester/datalake_ingester/storage.py b/ingester/datalake_ingester/storage.py index 092f6a5..81838be 100644 --- a/ingester/datalake_ingester/storage.py +++ b/ingester/datalake_ingester/storage.py @@ -76,7 +76,7 @@ def store_latest(self, record): Record must utilize AttributeValue syntax for the conditional put. """ - condition_expression = " attribute_not_exists(what_where_key) OR metadata.#metadata_start < :new_start" + condition_expression = " attribute_not_exists(what_where_key) OR metadata.#metadata_start <= :new_start" expression_attribute_values = { ':new_start': {'N': str(record['metadata']['start'])} } diff --git a/ingester/tests/test_storage.py b/ingester/tests/test_storage.py index a697b79..0aace6e 100644 --- a/ingester/tests/test_storage.py +++ b/ingester/tests/test_storage.py @@ -40,10 +40,7 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection): 'create_time': 1500000000000 } - try: - storage.store_latest(new_record) - except Exception as e: - print(f"Failed to store record: {str(e)}") + storage.store_latest(new_record) stored_record = dynamodb_latest_table.get_item( what_where_key=new_record['what_where_key'] @@ -51,10 +48,7 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection): assert stored_record['metadata']['start'] == new_record['metadata']['start'] -def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dynamodb_connection): - storage = DynamoDBStorage(connection=dynamodb_connection) - storage.latest_table_name = 'latest' - +def provide_test_records(): file1 = { 'what_where_key': 'syslog:ground_server2', 'time_index_key': '15219:zlcdzvawsp', @@ -67,7 +61,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna 'work_id': 'abc-123', 'where': 'ground_server2', 'what': 'syslog', - 'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2', + 'id': 'file1', 'hash': 'b4f2d8de24af342643d5b78a8f2b9b88' }, 'url': 's3://existingfile/url', @@ -86,7 +80,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna 'work_id': 'abc-123', 'where': 'ground_server2', 'what': 'syslog', - 'id': '45gb2d1ec54245c7a57e29ed5a6ea9b2', + 'id': 'file2', 'hash': 'c5g3d8de24af342643d5b78a8f2b9b88' }, @@ -106,7 +100,7 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna 'work_id': 'foo-bizz', 'where': 's114', 'what': 'syslog', - 'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2', + 'id': 'file3', 'hash': 'b4f2d8de24af342643d5b78a8f2b9b88' }, 'url': 's3://datalake/path_to_file1', @@ -114,6 +108,14 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna 'size': 1048576 } + return (file1, file2, file3) + + +def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dynamodb_connection): + storage = DynamoDBStorage(connection=dynamodb_connection) + storage.latest_table_name = 'latest' + file1, file2, file3 = provide_test_records() + storage.store_latest(file3) storage.store_latest(file1) storage.store_latest(file2) # same what:where, but should replace file1 b/c newer @@ -127,19 +129,70 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna assert len(records) == 2 assert file2 == res + +def test_store_conditional_put_newest_first(dynamodb_latest_table, dynamodb_connection): + storage = DynamoDBStorage(connection=dynamodb_connection) + storage.latest_table_name = 'latest' + file1, file2, file3 = provide_test_records() + storage.store_latest(file3) storage.store_latest(file2) storage.store_latest(file1) - records = [dict(i) for i in dynamodb_latest_table.scan()] + query_what_where = 'syslog:ground_server2' + res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where)) assert res['metadata']['id'] != file1['metadata']['id'] assert res['metadata']['id'] == file2['metadata']['id'] - - storage.store_latest(file1) - storage.store_latest(file1) - storage.store_latest(file2) - storage.store_latest(file3) - res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where)) - assert res['metadata']['start'] == file2['metadata']['start'] + +def test_verify_replace_record_same_start(dynamodb_latest_table, dynamodb_connection): + storage = DynamoDBStorage(connection=dynamodb_connection) + storage.latest_table_name = 'latest' + + record1 = { + 'what_where_key': 'syslog:ground_server2', + 'time_index_key': '15225:newlog', + 'range_key': 'new_server:12345abcde', + 'metadata': { + 'version': 1, + 'start': 1500000000000, + 'end': 1500000000010, + 'path': '/var/log/syslog.2', + 'work_id': None, + 'where': 'ground_server2', + 'what': 'syslog', + 'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2', + 'hash': 'b4f2d8de24af342643d5b78a8f2b9b88' + }, + 'url': 's3://newfile/url', + 'create_time': 1500000000000 + } + + record2 = { + 'what_where_key': 'syslog:ground_server2', + 'time_index_key': '15225:newlog', + 'range_key': 'new_server:12345abcde', + 'metadata': { + 'version': 1, + 'start': 1500000000000, + 'end': 1500000000010, + 'path': '/var/log/syslog.2', + 'work_id': None, + 'where': 'ground_server2', + 'what': 'syslog', + 'id': 'abc123', + 'hash': 'b4f2d8de24af342643d5b78a8f2b9b88' + }, + 'url': 's3://newfile/url', + 'create_time': 1500000000000 + } + + storage.store_latest(record1) + storage.store_latest(record2) + + stored_record = dynamodb_latest_table.get_item( + what_where_key="syslog:ground_server2" + ) + assert stored_record['metadata']['start'] == record1['metadata']['start'] + assert stored_record['metadata']['id'] == "abc123"