diff --git a/CHANGELOG.md b/CHANGELOG.md index 492d8e218..21e33b289 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ * Fix restarting of logprep every time the kafka input connector receives events that aren't valid json documents. Now the documents will be written to the error output. +* Fix ProcessCounter to actually print counts periodically and not only once events are processed ## v6.7.0 ### Improvements diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 45968e70c..3c197c74f 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -9,16 +9,16 @@ # pylint: disable=logging-fstring-interpolation import queue import warnings -from ctypes import c_bool, c_double, c_ulonglong +from ctypes import c_bool, c_ulonglong from functools import cached_property from logging import INFO, NOTSET, Handler, Logger from multiprocessing import Lock, Process, Value, current_process -from time import time from typing import Any, List, Tuple import attrs import msgspec import numpy as np +from schedule import Scheduler from logprep._version import get_versions from logprep.abc.component import Component @@ -62,21 +62,21 @@ class MustProvideAnMPLogHandlerError(BaseException): class SharedCounter: """A shared counter for multiprocessing pipelines.""" - CHECKING_PERIOD = 0.5 - def __init__(self): self._val = Value(c_ulonglong, 0) - self._lock = Lock() - self._timer = Value(c_double, 0) - self._checking_timer = 0 + self._printed = Value(c_bool, False) + self._lock = None self._logger = None self._period = None + self.scheduler = Scheduler() def _init_timer(self, period: float): if self._period is None: self._period = period - with self._lock: - self._timer.value = time() + self._period + jobs = map(lambda job: job.job_func.func, self.scheduler.jobs) + if self.print_value not in jobs and self.reset_printed not in jobs: + self.scheduler.every(int(self._period)).seconds.do(self.print_value) + self.scheduler.every(int(self._period + 1)).seconds.do(self.reset_printed) def _create_logger(self, log_handler: Handler): if self._logger is None: @@ -84,38 +84,36 @@ def _create_logger(self, log_handler: Handler): for handler in logger.handlers: logger.removeHandler(handler) logger.addHandler(log_handler) - self._logger = logger - def setup(self, print_processed_period: float, log_handler: Handler): + def setup(self, print_processed_period: float, log_handler: Handler, lock: Lock): """Setup shared counter for multiprocessing pipeline.""" self._create_logger(log_handler) self._init_timer(print_processed_period) - self._checking_timer = time() + self.CHECKING_PERIOD + self._lock = lock def increment(self): """Increment the counter.""" with self._lock: self._val.value += 1 - def print_if_ready(self): - """Periodically print the counter and reset it.""" - current_time = time() - if current_time > self._checking_timer: - self._checking_timer = current_time + self.CHECKING_PERIOD - if self._timer.value != 0 and current_time >= self._timer.value: - with self._lock: - if self._period / 60.0 < 1: - msg = f"Processed events per {self._period} seconds: {self._val.value}" - else: - msg = ( - f"Processed events per {self._period / 60.0:.2f} minutes: " - f"{self._val.value}" - ) - if self._logger: - self._logger.info(msg) - self._val.value = 0 - self._timer.value = time() + self._period + def reset_printed(self): + """Reset the printed flag after the configured period + 1""" + with self._lock: + self._printed.value = False + + def print_value(self): + """Print the number of processed event in the last interval""" + with self._lock: + if not self._printed.value: + period_human_form = f"{self._period} seconds" + if self._period / 60.0 > 1: + period_human_form = f"{self._period / 60.0:.2f} minutes" + self._logger.info( + f"Processed events per {period_human_form}: " f"{self._val.value}" + ) + self._val.value = 0 + self._printed.value = True def _handle_pipeline_error(func): @@ -237,6 +235,9 @@ def __init__( self._lock = lock self._shared_dict = shared_dict self._processing_counter = counter + if self._processing_counter: + print_processed_period = self._logprep_config.get("print_processed_period", 300) + self._processing_counter.setup(print_processed_period, log_handler, lock) self._used_server_ports = used_server_ports self._metric_targets = metric_targets self.pipeline_index = pipeline_index @@ -390,6 +391,8 @@ def process_pipeline(self) -> Tuple[dict, list]: assert self._input, "Run process_pipeline only with an valid input connector" self._metrics_exposer.expose(self.metrics) Component.run_pending_tasks() + if self._processing_counter: + self._processing_counter.scheduler.run_pending() extra_outputs = [] event = None try: @@ -452,7 +455,6 @@ def process_event(self, event: dict): break if self._processing_counter: self._processing_counter.increment() - self._processing_counter.print_if_ready() if self.metrics: self.metrics.number_of_processed_events += 1 return extra_outputs @@ -510,8 +512,6 @@ def __init__( raise MustProvideAnMPLogHandlerError self._profile = config.get("profile_pipelines", False) - print_processed_period = config.get("print_processed_period", 300) - self.processed_counter.setup(print_processed_period, log_handler) Pipeline.__init__( self, diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index 5d36163b8..5f5c3eee9 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -1,6 +1,9 @@ # pylint: disable=missing-docstring # pylint: disable=protected-access # pylint: disable=attribute-defined-outside-init +import logging +import re +import time from copy import deepcopy from logging import DEBUG, WARNING, getLogger from multiprocessing import Lock, active_children @@ -687,6 +690,13 @@ def test_event_with_critical_input_parsing_error_is_stored_in_error_output(self, self.pipeline.process_pipeline() self.pipeline._output["dummy"].store_failed.assert_called() + def test_process_pipeline_calls_shared_counter_scheduler(self, _): + self.pipeline._setup() + self.pipeline._input.get_next.return_value = ({}, {}) + self.pipeline._processing_counter = mock.MagicMock() + self.pipeline.process_pipeline() + assert self.pipeline._processing_counter.scheduler.run_pending.call_count == 1 + class TestPipelineWithActualInput: def setup_method(self): @@ -920,3 +930,24 @@ def start_and_stop_pipeline(wrapper): wrapper.join() return children_running + + +class TestSharedCounter: + test_logger = getLogger("test-logger") + + def test_shared_counter_prints_value_after_configured_period(self, caplog): + with caplog.at_level(logging.INFO): + shared_counter = SharedCounter() + print_period = 1 + shared_counter._logger = self.test_logger + shared_counter.setup(print_period, None, Lock()) + test_counter = 0 + test_counter_limit = 100 + start_time = time.time() + while time.time() - start_time < print_period: + if test_counter < test_counter_limit: + shared_counter.increment() + test_counter += 1 + shared_counter.scheduler.run_pending() + message = f".*Processed events per {print_period} seconds: {test_counter_limit}.*" + assert re.match(message, caplog.text)