diff --git a/mara_pipelines/logging/run_log.py b/mara_pipelines/logging/run_log.py index d40efd5..9c85c47 100644 --- a/mara_pipelines/logging/run_log.py +++ b/mara_pipelines/logging/run_log.py @@ -166,6 +166,8 @@ def handle_event(self, event: events.Event): # will come in 1 sec (default, if not changed...) print(f'Ignored problem on inserting system statistic events into the table: {e!r}', flush=True) elif isinstance(event, pipeline_events.NodeFinished): + key = tuple(event.node_path) + with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor cursor.execute(f''' UPDATE data_integration_node_run @@ -179,7 +181,9 @@ def handle_event(self, event: events.Event): VALUES ''' + ','.join([cursor.mogrify('(%s,%s,%s,%s,%s)', (node_run_id, output_event.timestamp, output_event.message, output_event.format, output_event.is_error)) .decode('utf-8') - for output_event in self.node_output.get(tuple(event.node_path))])) + for output_event in self.node_output.get(key)])) + + del self.node_output[key] elif isinstance(event, pipeline_events.RunFinished): with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor @@ -206,3 +210,5 @@ def handle_event(self, event: events.Event): cursor.execute(f''' DELETE FROM data_integration_system_statistics WHERE timestamp + INTERVAL '{config.run_log_retention_in_days()} days' < current_timestamp;''') + + self.node_output = None \ No newline at end of file