diff --git a/api/tests/test_archive_querier.py b/api/tests/test_archive_querier.py index 3dc84c9..ea51033 100644 --- a/api/tests/test_archive_querier.py +++ b/api/tests/test_archive_querier.py @@ -606,7 +606,6 @@ def test_query_latest_future_record_exceeds_lookforward(table_maker, querier, re record = record_maker(what='meow', where='tree', start=future_start, end=future_end) default_table, latest_table = table_maker([]) - print(default_table.__dict__, type(default_table)) default_table.put_item(Item=record[0]) latest_table.put_item(Item=record[0]) diff --git a/ingester/datalake_ingester/storage.py b/ingester/datalake_ingester/storage.py index 092f6a5..81838be 100644 --- a/ingester/datalake_ingester/storage.py +++ b/ingester/datalake_ingester/storage.py @@ -76,7 +76,7 @@ def store_latest(self, record): Record must utilize AttributeValue syntax for the conditional put. """ - condition_expression = " attribute_not_exists(what_where_key) OR metadata.#metadata_start < :new_start" + condition_expression = " attribute_not_exists(what_where_key) OR metadata.#metadata_start <= :new_start" expression_attribute_values = { ':new_start': {'N': str(record['metadata']['start'])} } diff --git a/ingester/tests/test_storage.py b/ingester/tests/test_storage.py index a697b79..f916171 100644 --- a/ingester/tests/test_storage.py +++ b/ingester/tests/test_storage.py @@ -143,3 +143,58 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where)) assert res['metadata']['start'] == file2['metadata']['start'] + + +def test_insert_ingestion_issue(dynamodb_latest_table, dynamodb_connection): + storage = DynamoDBStorage(connection=dynamodb_connection) + storage.latest_table_name = 'latest' + + record1 = { + 'what_where_key': 'syslog:ground_server2', + 'time_index_key': '15225:newlog', + 'range_key': 'new_server:12345abcde', + 'metadata': { + 'version': 1, + 'start': 1500000000000, + 'end': 1500000000010, + 'path': '/var/log/syslog.2', + 'work_id': None, + 'where': 'ground_server2', + 'what': 'syslog', + 'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2', + 'hash': 'b4f2d8de24af342643d5b78a8f2b9b88' + }, + 'url': 's3://newfile/url', + 'create_time': 1500000000000 + } + + record2 = { + 'what_where_key': 'syslog:ground_server2', + 'time_index_key': '15225:newlog', + 'range_key': 'new_server:12345abcde', + 'metadata': { + 'version': 1, + 'start': 1500000000000, + 'end': 1500000000010, + 'path': '/var/log/syslog.2', + 'work_id': None, + 'where': 'ground_server2', + 'what': 'syslog', + 'id': 'abc123', + 'hash': 'b4f2d8de24af342643d5b78a8f2b9b88' + }, + 'url': 's3://newfile/url', + 'create_time': 1500000000000 + } + + try: + storage.store_latest(record1) + storage.store_latest(record2) + except Exception as e: + print(f"Failed to store record: {str(e)}") + + stored_record = dynamodb_latest_table.get_item( + what_where_key="syslog:ground_server2" + ) + assert stored_record['metadata']['start'] == record1['metadata']['start'] + assert stored_record['metadata']['id'] == "abc123"