From 355fcca72fa33beed7db4dc24c3846aa38da896e Mon Sep 17 00:00:00 2001 From: dtrai2 Date: Mon, 28 Oct 2024 12:56:02 +0100 Subject: [PATCH] add tests pipeline manager - Introduced tests for `listen` and `drain_queue` methods. - Verified logging of unexpected exceptions during queue processing. - Ensured specific items and sentinel values are ignored during queue operations. - Increases test coverage --- tests/unit/framework/test_pipeline_manager.py | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index d44d08dd3..8729632e8 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -3,6 +3,7 @@ # pylint: disable=attribute-defined-outside-init # pylint: disable=unnecessary-lambda-assignment import multiprocessing +import time from copy import deepcopy from logging import Logger from logging.config import dictConfig @@ -194,6 +195,21 @@ def test_restart_failed_pipelines_sets_old_pipeline_index(self): pipeline_manager.restart_failed_pipeline() mock_create_pipeline.assert_called_once_with(1) + def test_restart_failed_pipeline_adds_error_output_health_check_to_metrics_exporter( + self, tmp_path + ): + mock_env = {"PROMETHEUS_MULTIPROC_DIR": str(tmp_path)} + with mock.patch.dict("os.environ", mock_env): + with mock.patch("logprep.framework.pipeline_manager.OutputQueueListener"): + with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue") as mock_queue: + self.config.metrics = MetricsConfig(enabled=True) + self.config.error_output = {"dummy": {"type": "dummy_output"}} + pipeline_manager = PipelineManager(self.config) + mock_export = mock.MagicMock() + pipeline_manager.prometheus_exporter = mock_export + pipeline_manager.set_count(1) + mock_export.update_healthchecks.assert_called() + def test_pipeline_manager_sets_queue_size_for_http_input(self): config = deepcopy(self.config) config.input = { @@ -587,3 +603,80 @@ def test_listen_ensures_error_queue_is_closed_after_drained(self): listener._listen() with pytest.raises(ValueError, match="is closed"): listener.queue.put("test") + + @mock.patch( + "logprep.framework.pipeline_manager.OutputQueueListener.get_output_instance", + ) + def test_listen_ignores_sync_item_1_and_sentinel(self, mock_get_output_instance): + mock_component = mock.MagicMock() + mock_target = mock.MagicMock() + mock_component.store = mock_target + mock_get_output_instance.return_value = mock_component + target = "store" + output_config = {"random_name": {"type": "dummy_output"}} + queue = ThrottlingQueue(multiprocessing.get_context(), 100) + listener = OutputQueueListener(queue, target, output_config) + listener.queue.put(1) + listener.queue.put(listener.sentinel) + listener._listen() + mock_target.assert_not_called() + + @mock.patch( + "logprep.framework.pipeline_manager.OutputQueueListener.get_output_instance", + ) + @mock.patch("logging.Logger.error") + def test_listen_logs_event_on_unexpected_exception(self, mock_error, mock_get_output_instance): + mock_component = mock.MagicMock() + mock_target = mock.MagicMock() + mock_target.side_effect = Exception("TestException") + mock_component.store = mock_target + mock_get_output_instance.return_value = mock_component + target = "store" + output_config = {"random_name": {"type": "dummy_output"}} + queue = ThrottlingQueue(multiprocessing.get_context(), 100) + listener = OutputQueueListener(queue, target, output_config) + event = {"event": "test"} + listener.queue.put(event) + listener.queue.put(listener.sentinel) + time.sleep(0.1) # test will sometimes fail without it, probably due to ThrottlingQueue + listener._listen() + expected_error_log = ( + f"[Error Event] Couldn't enqueue error item due to: TestException | Item: '{event}'" + ) + mock_error.assert_called_with(expected_error_log) + + @mock.patch( + "logprep.framework.pipeline_manager.OutputQueueListener.get_output_instance", + new=mock.MagicMock(), + ) + def test_drain_queue_ignores_sync_item_1_and_sentinel(self): + target = "store" + output_config = {"random_name": {"type": "dummy_output"}} + queue = ThrottlingQueue(multiprocessing.get_context(), 100) + listener = OutputQueueListener(queue, target, output_config) + listener.queue.put(1) + listener.queue.put(listener.sentinel) + mock_target = mock.MagicMock() + listener._drain_queue(mock_target) + mock_target.assert_not_called() + + @mock.patch( + "logprep.framework.pipeline_manager.OutputQueueListener.get_output_instance", + new=mock.MagicMock(), + ) + @mock.patch("logging.Logger.error") + def test_drain_queue_logs_event_on_unexpected_exception(self, mock_error): + target = "store" + output_config = {"random_name": {"type": "dummy_output"}} + queue = ThrottlingQueue(multiprocessing.get_context(), 100) + listener = OutputQueueListener(queue, target, output_config) + event = {"event": "test"} + listener.queue.put(event) + mock_target = mock.MagicMock() + mock_target.side_effect = Exception("TestException") + time.sleep(0.1) # test will sometimes fail without it, probably due to ThrottlingQueue + listener._drain_queue(mock_target) + expected_error_log = ( + f"[Error Event] Couldn't enqueue error item due to: TestException | Item: '{event}'" + ) + mock_error.assert_called_with(expected_error_log)