Skip to content

Commit

Permalink
add a simple pipeline unit test without mara db (#71)
Browse files Browse the repository at this point in the history
add a simple pipeline unit test without mara db
  • Loading branch information
leo-schick authored May 13, 2022
1 parent 5fc9aff commit 4ec05ed
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 5 deletions.
18 changes: 15 additions & 3 deletions mara_pipelines/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ def run_pipeline(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None,
# A queue for receiving events from forked sub processes
event_queue = multiprocessing_context.Queue()

# The historical node cost is taken from the 'mara' db. If the 'mara' datatabase is not defined
# (e.g. in testing scenarios) we do not use the node cost.
import mara_db.config
use_historical_node_cost = 'mara' in mara_db.config.databases()
if not use_historical_node_cost:
print(f"[WARNING] The 'mara' database is not defined. The historical node costs are not used, the execution path might be inefficient.", file=sys.stderr)

# The function that is run in a sub process
def run():

Expand All @@ -64,7 +71,7 @@ def run():
node_queue: [pipelines.Node] = []

# data needed for computing cost
node_durations_and_run_times = node_cost.node_durations_and_run_times(pipeline)
node_durations_and_run_times = node_cost.node_durations_and_run_times(pipeline) if use_historical_node_cost else {}

# Putting nodes into the node queue
def queue(nodes: [pipelines.Node]):
Expand Down Expand Up @@ -207,7 +214,8 @@ def track_finished_pipelines():
next_node.add_dependency(pipeline_node, downstream)

# get cost information for children
node_durations_and_run_times.update(node_cost.node_durations_and_run_times(next_node))
if use_historical_node_cost:
node_durations_and_run_times.update(node_cost.node_durations_and_run_times(next_node))

# queue all child nodes
queue(list(next_node.nodes.values()))
Expand Down Expand Up @@ -314,7 +322,11 @@ def track_finished_pipelines():
run_process = multiprocessing_context.Process(target=run, name='pipeline-' + '-'.join(pipeline.path()))
run_process.start()

runlogger = run_log.RunLogger()
if 'mara' in mara_db.config.databases():
runlogger = run_log.RunLogger()
else:
runlogger = run_log.NullLogger()
print(f"[WARNING] The events of the pipeline execution are not saved in a db", file=sys.stderr)

# make sure that we close this run (if still open) as failed when we close this python process
# On SIGKILL we will still leave behind open runs...
Expand Down
22 changes: 20 additions & 2 deletions mara_pipelines/logging/run_log.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
"""Logging pipeline runs, node output and status information in mara database"""

import psycopg2.extensions
import sqlalchemy.orm
from sqlalchemy.ext.declarative import declarative_base

import mara_db.postgresql
from .. import config
from ..logging import pipeline_events, system_statistics
from .. import events
Expand Down Expand Up @@ -73,6 +71,15 @@ def close_open_run_after_error(run_id: int):
"""Closes all open run and node_run for this run_id as failed"""
if run_id is None:
return

import mara_db.config

if 'mara' not in mara_db.config.databases():
return

import psycopg2.extensions
import mara_db.postgresql

_close_run = f'''
UPDATE data_integration_run
SET end_time = now(), succeeded = FALSE
Expand All @@ -98,11 +105,22 @@ def close_open_run_after_error(run_id: int):
print(f'Cleaned up open runs/node_runs (run_id = {run_id})')


class NullLogger(events.EventHandler):
"""A run logger not handling events"""
run_id: int = None

def handle_event(self, event: events.Event):
pass


class RunLogger(events.EventHandler):
"""A run logger saving the pipeline events to the 'mara' database alias"""
run_id: int = None
node_output: {tuple: [pipeline_events.Output]} = None

def handle_event(self, event: events.Event):
import psycopg2.extensions
import mara_db.postgresql

if isinstance(event, pipeline_events.RunStarted):
with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor
Expand Down
6 changes: 6 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ def get_long_description():
'requests>=2.19.1'
],

extras_require={
'test': [
'pytest',
'mara_app>=1.5.2'],
},

setup_requires=['setuptools_scm'],
include_package_data=True,

Expand Down
26 changes: 26 additions & 0 deletions tests/test_execute_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pytest

from mara_app.monkey_patch import patch

import mara_db.config
patch(mara_db.config.databases)(lambda: {})


def test_execute_without_db():
from mara_pipelines.commands.python import RunFunction
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.ui.cli import run_pipeline

pipeline = Pipeline(
id='test_execute_without_db',
description="Tests if a pipeline can be executed without database")

def command_function() -> bool:
return True

pipeline.add(
Task(id='run_python_function',
description="Runs a sample python function",
commands=[RunFunction(function=command_function)]))

assert run_pipeline(pipeline)

0 comments on commit 4ec05ed

Please sign in to comment.