Skip to content

Commit

Permalink
Ingester: Conditional put utilizes less than or equal to for records.…
Browse files Browse the repository at this point in the history
… Resolves ingestion queue bug.
  • Loading branch information
ABPLMC committed Oct 4, 2024
1 parent 392760c commit 645ce00
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 2 deletions.
1 change: 0 additions & 1 deletion api/tests/test_archive_querier.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,6 @@ def test_query_latest_future_record_exceeds_lookforward(table_maker, querier, re
record = record_maker(what='meow', where='tree', start=future_start, end=future_end)

default_table, latest_table = table_maker([])
print(default_table.__dict__, type(default_table))

default_table.put_item(Item=record[0])
latest_table.put_item(Item=record[0])
Expand Down
2 changes: 1 addition & 1 deletion ingester/datalake_ingester/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def store_latest(self, record):
Record must utilize AttributeValue syntax
for the conditional put.
"""
condition_expression = " attribute_not_exists(what_where_key) OR metadata.#metadata_start < :new_start"
condition_expression = " attribute_not_exists(what_where_key) OR metadata.#metadata_start <= :new_start"
expression_attribute_values = {
':new_start': {'N': str(record['metadata']['start'])}
}
Expand Down
55 changes: 55 additions & 0 deletions ingester/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,58 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna
res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where))
assert res['metadata']['start'] == file2['metadata']['start']



def test_insert_ingestion_issue(dynamodb_latest_table, dynamodb_connection):
storage = DynamoDBStorage(connection=dynamodb_connection)
storage.latest_table_name = 'latest'

record1 = {
'what_where_key': 'syslog:ground_server2',
'time_index_key': '15225:newlog',
'range_key': 'new_server:12345abcde',
'metadata': {
'version': 1,
'start': 1500000000000,
'end': 1500000000010,
'path': '/var/log/syslog.2',
'work_id': None,
'where': 'ground_server2',
'what': 'syslog',
'id': '34fb2d1ec54245c7a57e29ed5a6ea9b2',
'hash': 'b4f2d8de24af342643d5b78a8f2b9b88'
},
'url': 's3://newfile/url',
'create_time': 1500000000000
}

record2 = {
'what_where_key': 'syslog:ground_server2',
'time_index_key': '15225:newlog',
'range_key': 'new_server:12345abcde',
'metadata': {
'version': 1,
'start': 1500000000000,
'end': 1500000000010,
'path': '/var/log/syslog.2',
'work_id': None,
'where': 'ground_server2',
'what': 'syslog',
'id': 'abc123',
'hash': 'b4f2d8de24af342643d5b78a8f2b9b88'
},
'url': 's3://newfile/url',
'create_time': 1500000000000
}

try:
storage.store_latest(record1)
storage.store_latest(record2)
except Exception as e:
print(f"Failed to store record: {str(e)}")

stored_record = dynamodb_latest_table.get_item(
what_where_key="syslog:ground_server2"
)
assert stored_record['metadata']['start'] == record1['metadata']['start']
assert stored_record['metadata']['id'] == "abc123"

0 comments on commit 645ce00

Please sign in to comment.