Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revise processing counter #431

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

* Fix restarting of logprep every time the kafka input connector receives events that aren't valid
json documents. Now the documents will be written to the error output.
* Fix ProcessCounter to actually print counts periodically and not only once events are processed

## v6.7.0
### Improvements
Expand Down
66 changes: 33 additions & 33 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
# pylint: disable=logging-fstring-interpolation
import queue
import warnings
from ctypes import c_bool, c_double, c_ulonglong
from ctypes import c_bool, c_ulonglong
from functools import cached_property
from logging import INFO, NOTSET, Handler, Logger
from multiprocessing import Lock, Process, Value, current_process
from time import time
from typing import Any, List, Tuple

import attrs
import msgspec
import numpy as np
from schedule import Scheduler

from logprep._version import get_versions
from logprep.abc.component import Component
Expand Down Expand Up @@ -62,60 +62,58 @@ class MustProvideAnMPLogHandlerError(BaseException):
class SharedCounter:
"""A shared counter for multiprocessing pipelines."""

CHECKING_PERIOD = 0.5

def __init__(self):
self._val = Value(c_ulonglong, 0)
self._lock = Lock()
self._timer = Value(c_double, 0)
self._checking_timer = 0
self._printed = Value(c_bool, False)
self._lock = None
self._logger = None
self._period = None
self.scheduler = Scheduler()

def _init_timer(self, period: float):
if self._period is None:
self._period = period
with self._lock:
self._timer.value = time() + self._period
jobs = map(lambda job: job.job_func.func, self.scheduler.jobs)
if self.print_value not in jobs and self.reset_printed not in jobs:
self.scheduler.every(int(self._period)).seconds.do(self.print_value)
self.scheduler.every(int(self._period + 1)).seconds.do(self.reset_printed)

def _create_logger(self, log_handler: Handler):
if self._logger is None:
logger = Logger("Processing Counter", level=log_handler.level)
for handler in logger.handlers:
logger.removeHandler(handler)
logger.addHandler(log_handler)

self._logger = logger

def setup(self, print_processed_period: float, log_handler: Handler):
def setup(self, print_processed_period: float, log_handler: Handler, lock: Lock):
"""Setup shared counter for multiprocessing pipeline."""
self._create_logger(log_handler)
self._init_timer(print_processed_period)
self._checking_timer = time() + self.CHECKING_PERIOD
self._lock = lock

def increment(self):
"""Increment the counter."""
with self._lock:
self._val.value += 1

def print_if_ready(self):
"""Periodically print the counter and reset it."""
current_time = time()
if current_time > self._checking_timer:
self._checking_timer = current_time + self.CHECKING_PERIOD
if self._timer.value != 0 and current_time >= self._timer.value:
with self._lock:
if self._period / 60.0 < 1:
msg = f"Processed events per {self._period} seconds: {self._val.value}"
else:
msg = (
f"Processed events per {self._period / 60.0:.2f} minutes: "
f"{self._val.value}"
)
if self._logger:
self._logger.info(msg)
self._val.value = 0
self._timer.value = time() + self._period
def reset_printed(self):
"""Reset the printed flag after the configured period + 1"""
with self._lock:
self._printed.value = False

def print_value(self):
"""Print the number of processed event in the last interval"""
with self._lock:
if not self._printed.value:
period_human_form = f"{self._period} seconds"
if self._period / 60.0 > 1:
period_human_form = f"{self._period / 60.0:.2f} minutes"
self._logger.info(
f"Processed events per {period_human_form}: " f"{self._val.value}"
)
self._val.value = 0
self._printed.value = True


def _handle_pipeline_error(func):
Expand Down Expand Up @@ -237,6 +235,9 @@ def __init__(
self._lock = lock
self._shared_dict = shared_dict
self._processing_counter = counter
if self._processing_counter:
print_processed_period = self._logprep_config.get("print_processed_period", 300)
self._processing_counter.setup(print_processed_period, log_handler, lock)
self._used_server_ports = used_server_ports
self._metric_targets = metric_targets
self.pipeline_index = pipeline_index
Expand Down Expand Up @@ -390,6 +391,8 @@ def process_pipeline(self) -> Tuple[dict, list]:
assert self._input, "Run process_pipeline only with an valid input connector"
self._metrics_exposer.expose(self.metrics)
Component.run_pending_tasks()
if self._processing_counter:
self._processing_counter.scheduler.run_pending()
extra_outputs = []
event = None
try:
Expand Down Expand Up @@ -452,7 +455,6 @@ def process_event(self, event: dict):
break
if self._processing_counter:
self._processing_counter.increment()
self._processing_counter.print_if_ready()
if self.metrics:
self.metrics.number_of_processed_events += 1
return extra_outputs
Expand Down Expand Up @@ -510,8 +512,6 @@ def __init__(
raise MustProvideAnMPLogHandlerError

self._profile = config.get("profile_pipelines", False)
print_processed_period = config.get("print_processed_period", 300)
self.processed_counter.setup(print_processed_period, log_handler)

Pipeline.__init__(
self,
Expand Down
31 changes: 31 additions & 0 deletions tests/unit/framework/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# pylint: disable=missing-docstring
# pylint: disable=protected-access
# pylint: disable=attribute-defined-outside-init
import logging
import re
import time
from copy import deepcopy
from logging import DEBUG, WARNING, getLogger
from multiprocessing import Lock, active_children
Expand Down Expand Up @@ -687,6 +690,13 @@ def test_event_with_critical_input_parsing_error_is_stored_in_error_output(self,
self.pipeline.process_pipeline()
self.pipeline._output["dummy"].store_failed.assert_called()

def test_process_pipeline_calls_shared_counter_scheduler(self, _):
self.pipeline._setup()
self.pipeline._input.get_next.return_value = ({}, {})
self.pipeline._processing_counter = mock.MagicMock()
self.pipeline.process_pipeline()
assert self.pipeline._processing_counter.scheduler.run_pending.call_count == 1


class TestPipelineWithActualInput:
def setup_method(self):
Expand Down Expand Up @@ -920,3 +930,24 @@ def start_and_stop_pipeline(wrapper):
wrapper.join()

return children_running


class TestSharedCounter:
test_logger = getLogger("test-logger")

def test_shared_counter_prints_value_after_configured_period(self, caplog):
with caplog.at_level(logging.INFO):
shared_counter = SharedCounter()
print_period = 1
shared_counter._logger = self.test_logger
shared_counter.setup(print_period, None, Lock())
test_counter = 0
test_counter_limit = 100
start_time = time.time()
while time.time() - start_time < print_period:
if test_counter < test_counter_limit:
shared_counter.increment()
test_counter += 1
shared_counter.scheduler.run_pending()
message = f".*Processed events per {print_period} seconds: {test_counter_limit}.*"
assert re.match(message, caplog.text)
Loading