Skip to content

Commit

Permalink
fix clean node_output in RunLogger after NodeFinished/RunFinished #77
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-schick committed May 18, 2022
1 parent 4ec05ed commit 3281744
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion mara_pipelines/logging/run_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ def handle_event(self, event: events.Event):
# will come in 1 sec (default, if not changed...)
print(f'Ignored problem on inserting system statistic events into the table: {e!r}', flush=True)
elif isinstance(event, pipeline_events.NodeFinished):
key = tuple(event.node_path)

with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor
cursor.execute(f'''
UPDATE data_integration_node_run
Expand All @@ -179,7 +181,9 @@ def handle_event(self, event: events.Event):
VALUES ''' + ','.join([cursor.mogrify('(%s,%s,%s,%s,%s)', (node_run_id, output_event.timestamp, output_event.message,
output_event.format, output_event.is_error))
.decode('utf-8')
for output_event in self.node_output.get(tuple(event.node_path))]))
for output_event in self.node_output.get(key)]))

del self.node_output[key]

elif isinstance(event, pipeline_events.RunFinished):
with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor
Expand All @@ -206,3 +210,5 @@ def handle_event(self, event: events.Event):
cursor.execute(f'''
DELETE FROM data_integration_system_statistics
WHERE timestamp + INTERVAL '{config.run_log_retention_in_days()} days' < current_timestamp;''')

self.node_output = None

0 comments on commit 3281744

Please sign in to comment.