Skip to content

Commit

Permalink
add tests pipeline manager
Browse files Browse the repository at this point in the history
- Introduced tests for `listen` and `drain_queue` methods.
- Verified logging of unexpected exceptions during queue processing.
- Ensured specific items and sentinel values are ignored during queue operations.
- Increases test coverage
  • Loading branch information
dtrai2 committed Oct 28, 2024
1 parent de06bd6 commit 355fcca
Showing 1 changed file with 93 additions and 0 deletions.
93 changes: 93 additions & 0 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# pylint: disable=attribute-defined-outside-init
# pylint: disable=unnecessary-lambda-assignment
import multiprocessing
import time
from copy import deepcopy
from logging import Logger
from logging.config import dictConfig
Expand Down Expand Up @@ -194,6 +195,21 @@ def test_restart_failed_pipelines_sets_old_pipeline_index(self):
pipeline_manager.restart_failed_pipeline()
mock_create_pipeline.assert_called_once_with(1)

def test_restart_failed_pipeline_adds_error_output_health_check_to_metrics_exporter(
self, tmp_path
):
mock_env = {"PROMETHEUS_MULTIPROC_DIR": str(tmp_path)}
with mock.patch.dict("os.environ", mock_env):
with mock.patch("logprep.framework.pipeline_manager.OutputQueueListener"):
with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue") as mock_queue:
self.config.metrics = MetricsConfig(enabled=True)
self.config.error_output = {"dummy": {"type": "dummy_output"}}
pipeline_manager = PipelineManager(self.config)
mock_export = mock.MagicMock()
pipeline_manager.prometheus_exporter = mock_export
pipeline_manager.set_count(1)
mock_export.update_healthchecks.assert_called()

def test_pipeline_manager_sets_queue_size_for_http_input(self):
config = deepcopy(self.config)
config.input = {
Expand Down Expand Up @@ -587,3 +603,80 @@ def test_listen_ensures_error_queue_is_closed_after_drained(self):
listener._listen()
with pytest.raises(ValueError, match="is closed"):
listener.queue.put("test")

@mock.patch(
"logprep.framework.pipeline_manager.OutputQueueListener.get_output_instance",
)
def test_listen_ignores_sync_item_1_and_sentinel(self, mock_get_output_instance):
mock_component = mock.MagicMock()
mock_target = mock.MagicMock()
mock_component.store = mock_target
mock_get_output_instance.return_value = mock_component
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = OutputQueueListener(queue, target, output_config)
listener.queue.put(1)
listener.queue.put(listener.sentinel)
listener._listen()
mock_target.assert_not_called()

@mock.patch(
"logprep.framework.pipeline_manager.OutputQueueListener.get_output_instance",
)
@mock.patch("logging.Logger.error")
def test_listen_logs_event_on_unexpected_exception(self, mock_error, mock_get_output_instance):
mock_component = mock.MagicMock()
mock_target = mock.MagicMock()
mock_target.side_effect = Exception("TestException")
mock_component.store = mock_target
mock_get_output_instance.return_value = mock_component
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = OutputQueueListener(queue, target, output_config)
event = {"event": "test"}
listener.queue.put(event)
listener.queue.put(listener.sentinel)
time.sleep(0.1) # test will sometimes fail without it, probably due to ThrottlingQueue
listener._listen()
expected_error_log = (
f"[Error Event] Couldn't enqueue error item due to: TestException | Item: '{event}'"
)
mock_error.assert_called_with(expected_error_log)

@mock.patch(
"logprep.framework.pipeline_manager.OutputQueueListener.get_output_instance",
new=mock.MagicMock(),
)
def test_drain_queue_ignores_sync_item_1_and_sentinel(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = OutputQueueListener(queue, target, output_config)
listener.queue.put(1)
listener.queue.put(listener.sentinel)
mock_target = mock.MagicMock()
listener._drain_queue(mock_target)
mock_target.assert_not_called()

@mock.patch(
"logprep.framework.pipeline_manager.OutputQueueListener.get_output_instance",
new=mock.MagicMock(),
)
@mock.patch("logging.Logger.error")
def test_drain_queue_logs_event_on_unexpected_exception(self, mock_error):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
listener = OutputQueueListener(queue, target, output_config)
event = {"event": "test"}
listener.queue.put(event)
mock_target = mock.MagicMock()
mock_target.side_effect = Exception("TestException")
time.sleep(0.1) # test will sometimes fail without it, probably due to ThrottlingQueue
listener._drain_queue(mock_target)
expected_error_log = (
f"[Error Event] Couldn't enqueue error item due to: TestException | Item: '{event}'"
)
mock_error.assert_called_with(expected_error_log)

0 comments on commit 355fcca

Please sign in to comment.