From 4ec05edf65b89b72c7d27838470f833cd588f576 Mon Sep 17 00:00:00 2001 From: Leo Schick <67712864+leo-schick@users.noreply.github.com> Date: Fri, 13 May 2022 14:45:21 +0200 Subject: [PATCH] add a simple pipeline unit test without mara db (#71) add a simple pipeline unit test without mara db --- mara_pipelines/execution.py | 18 +++++++++++++++--- mara_pipelines/logging/run_log.py | 22 ++++++++++++++++++++-- setup.py | 6 ++++++ tests/test_execute_pipeline.py | 26 ++++++++++++++++++++++++++ 4 files changed, 67 insertions(+), 5 deletions(-) create mode 100644 tests/test_execute_pipeline.py diff --git a/mara_pipelines/execution.py b/mara_pipelines/execution.py index 60a4a32..afc0c39 100644 --- a/mara_pipelines/execution.py +++ b/mara_pipelines/execution.py @@ -53,6 +53,13 @@ def run_pipeline(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None, # A queue for receiving events from forked sub processes event_queue = multiprocessing_context.Queue() + # The historical node cost is taken from the 'mara' db. If the 'mara' datatabase is not defined + # (e.g. in testing scenarios) we do not use the node cost. + import mara_db.config + use_historical_node_cost = 'mara' in mara_db.config.databases() + if not use_historical_node_cost: + print(f"[WARNING] The 'mara' database is not defined. The historical node costs are not used, the execution path might be inefficient.", file=sys.stderr) + # The function that is run in a sub process def run(): @@ -64,7 +71,7 @@ def run(): node_queue: [pipelines.Node] = [] # data needed for computing cost - node_durations_and_run_times = node_cost.node_durations_and_run_times(pipeline) + node_durations_and_run_times = node_cost.node_durations_and_run_times(pipeline) if use_historical_node_cost else {} # Putting nodes into the node queue def queue(nodes: [pipelines.Node]): @@ -207,7 +214,8 @@ def track_finished_pipelines(): next_node.add_dependency(pipeline_node, downstream) # get cost information for children - node_durations_and_run_times.update(node_cost.node_durations_and_run_times(next_node)) + if use_historical_node_cost: + node_durations_and_run_times.update(node_cost.node_durations_and_run_times(next_node)) # queue all child nodes queue(list(next_node.nodes.values())) @@ -314,7 +322,11 @@ def track_finished_pipelines(): run_process = multiprocessing_context.Process(target=run, name='pipeline-' + '-'.join(pipeline.path())) run_process.start() - runlogger = run_log.RunLogger() + if 'mara' in mara_db.config.databases(): + runlogger = run_log.RunLogger() + else: + runlogger = run_log.NullLogger() + print(f"[WARNING] The events of the pipeline execution are not saved in a db", file=sys.stderr) # make sure that we close this run (if still open) as failed when we close this python process # On SIGKILL we will still leave behind open runs... diff --git a/mara_pipelines/logging/run_log.py b/mara_pipelines/logging/run_log.py index 5781cbf..d40efd5 100644 --- a/mara_pipelines/logging/run_log.py +++ b/mara_pipelines/logging/run_log.py @@ -1,10 +1,8 @@ """Logging pipeline runs, node output and status information in mara database""" -import psycopg2.extensions import sqlalchemy.orm from sqlalchemy.ext.declarative import declarative_base -import mara_db.postgresql from .. import config from ..logging import pipeline_events, system_statistics from .. import events @@ -73,6 +71,15 @@ def close_open_run_after_error(run_id: int): """Closes all open run and node_run for this run_id as failed""" if run_id is None: return + + import mara_db.config + + if 'mara' not in mara_db.config.databases(): + return + + import psycopg2.extensions + import mara_db.postgresql + _close_run = f''' UPDATE data_integration_run SET end_time = now(), succeeded = FALSE @@ -98,11 +105,22 @@ def close_open_run_after_error(run_id: int): print(f'Cleaned up open runs/node_runs (run_id = {run_id})') +class NullLogger(events.EventHandler): + """A run logger not handling events""" + run_id: int = None + + def handle_event(self, event: events.Event): + pass + + class RunLogger(events.EventHandler): + """A run logger saving the pipeline events to the 'mara' database alias""" run_id: int = None node_output: {tuple: [pipeline_events.Output]} = None def handle_event(self, event: events.Event): + import psycopg2.extensions + import mara_db.postgresql if isinstance(event, pipeline_events.RunStarted): with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor diff --git a/setup.py b/setup.py index 85dbcff..a506c82 100755 --- a/setup.py +++ b/setup.py @@ -28,6 +28,12 @@ def get_long_description(): 'requests>=2.19.1' ], + extras_require={ + 'test': [ + 'pytest', + 'mara_app>=1.5.2'], + }, + setup_requires=['setuptools_scm'], include_package_data=True, diff --git a/tests/test_execute_pipeline.py b/tests/test_execute_pipeline.py new file mode 100644 index 0000000..0b93539 --- /dev/null +++ b/tests/test_execute_pipeline.py @@ -0,0 +1,26 @@ +import pytest + +from mara_app.monkey_patch import patch + +import mara_db.config +patch(mara_db.config.databases)(lambda: {}) + + +def test_execute_without_db(): + from mara_pipelines.commands.python import RunFunction + from mara_pipelines.pipelines import Pipeline, Task + from mara_pipelines.ui.cli import run_pipeline + + pipeline = Pipeline( + id='test_execute_without_db', + description="Tests if a pipeline can be executed without database") + + def command_function() -> bool: + return True + + pipeline.add( + Task(id='run_python_function', + description="Runs a sample python function", + commands=[RunFunction(function=command_function)])) + + assert run_pipeline(pipeline) \ No newline at end of file