Skip to content

Commit

Permalink
refactor and clean up OutputQueueListener
Browse files Browse the repository at this point in the history
- Consolidate OutputQueueListener to use multiprocessing exclusively.
- Remove threading implementation and related configurations.
- Update tests and documentation to reflect these changes.
  • Loading branch information
dtrai2 committed Oct 24, 2024
1 parent d42b3f0 commit 6acb311
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 98 deletions.
6 changes: 3 additions & 3 deletions doc/source/development/connector_how_to.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Implementing a new Connector

Connectors are used to fetch or store log messages.
Input and ouput connectors work each independently, with the exception that an output connector
might call a callback function inside the input, to notify that the current batch was sucessfully
might call a callback function inside the input, to notify that the current batch was sucessfully
processed. Only then the input would start collecting new inputs.
Because of this independence, it is possible to receive messages from one system and to store them
in another, i.e. reading from Kafka and writing to OpenSearch.
Expand Down Expand Up @@ -40,7 +40,7 @@ An exception should be thrown if an error occurs on calling this function.
These exceptions must inherit from the exception classes in :py:class:`~logprep.input.input.Input`.
They should return a helpful message when calling `str(exception)`.
Exceptions requiring Logprep to restart should inherit from `FatalInputError`.
Exceptions that inherit from `WarningInputError` will be logged, but they do not require any error
Exceptions that inherit from `WarningInputError` will be logged, but they do not require any error
handling.

.. _connector_output:
Expand Down Expand Up @@ -70,7 +70,7 @@ Error Output

Error output is setup in :py:class:`~logprep.framework.pipeline_manager.PipelineManager`. The error
output connector is instantiated and setup only once during the initialization of the pipeline manager
together with :py:class:`~logprep.framework.pipeline_manager.ComponentQueueListener`.
together with :py:class:`~logprep.framework.pipeline_manager.OutputQueueListener`.
The listener is used to listen on the populated error queue and to send the log messages to the
:code:`store` method of the error output connector.
The error queue is given to the listener and to all pipelines instantiated by the pipeline manager.
Expand Down
1 change: 0 additions & 1 deletion examples/exampledata/config/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ timeout: 0.1
restart_count: 2
config_refresh_interval: 5
error_backlog_size: 1500000
component_queue_listener_implementation: multiprocessing
logger:
level: INFO
format: "%(asctime)-15s %(hostname)-5s %(name)-10s %(levelname)-8s: %(message)s"
Expand Down
27 changes: 8 additions & 19 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import multiprocessing.queues
import random
import sys
import threading
import time
from typing import Any

Expand Down Expand Up @@ -58,7 +57,7 @@ def put(self, obj, block=True, timeout=None, batch_size=1):


@define()
class ComponentQueueListener:
class OutputQueueListener:
"""This forks a process and handles all items from the given queue into
the specified callable. It uses a sentinel object to stop the process."""

Expand All @@ -73,28 +72,19 @@ class ComponentQueueListener:
config: dict = field(validator=validators.instance_of(dict))
"""The configuration of the component used in this listener instance."""

sentinel: Any = field(default=None)
sentinel: Any = field(default=None, init=False)
"""The sentinel object to stop the process. This has to implement identity comparison."""

_instance: multiprocessing.Process | threading.Thread = field(init=False)
_process: multiprocessing.Process = field(init=False)
"""The process that is forked to listen to the queue."""

_implementation: str = field(
default="threading", validator=validators.in_(["threading", "multiprocessing"])
)
"""The implementation to use for the listener. Options are threading or multiprocessing.
Default is threading."""

def __attrs_post_init__(self):
if self._implementation == "threading":
self._instance = threading.Thread(target=self._listen, daemon=True)
elif self._implementation == "multiprocessing":
self._instance = multiprocessing.Process(target=self._listen, daemon=True)
self._process = multiprocessing.Process(target=self._listen, daemon=True)

def start(self):
"""Start the listener."""
logger.debug("Starting listener with target: %s", self.target)
self._instance.start()
self._process.start()

def get_component_instance(self):
component = Factory.create(self.config)
Expand Down Expand Up @@ -144,7 +134,7 @@ def _drain_queue(self, target):
def stop(self):
"""Stop the listener."""
self.queue.put(self.sentinel)
self._instance.join()
self._process.join()
logger.debug("Stopped listener.")


Expand Down Expand Up @@ -185,7 +175,7 @@ def __init__(self, configuration: Configuration):
self.metrics = self.Metrics(labels={"component": "manager"})
self.loghandler: LogprepMPQueueListener = None
self.error_queue: multiprocessing.Queue | None = None
self._error_listener: ComponentQueueListener | None = None
self._error_listener: OutputQueueListener | None = None
self._configuration: Configuration = configuration
self._pipelines: list[multiprocessing.Process] = []
self.prometheus_exporter: PrometheusExporter | None = None
Expand All @@ -207,11 +197,10 @@ def _setup_error_queue(self):
self.error_queue = ThrottlingQueue(
multiprocessing.get_context(), self._configuration.error_backlog_size
)
self._error_listener = ComponentQueueListener(
self._error_listener = OutputQueueListener(
self.error_queue,
"store",
self._configuration.error_output,
implementation=self._configuration.component_queue_listener_implementation,
)
self._error_listener.start()
# wait for the error listener to be ready before starting the pipelines
Expand Down
11 changes: 4 additions & 7 deletions logprep/util/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,10 @@ class LoggerConfig:
format: str = field(default="", validator=[validators.instance_of(str)], eq=False)
"""The format of the log message as supported by the :code:`LogprepFormatter`.
Defaults to :code:`"%(asctime)-15s %(name)-10s %(levelname)-8s: %(message)s"`.
.. autoclass:: logprep.util.logging.LogprepFormatter
:no-index:
"""
datefmt: str = field(default="", validator=[validators.instance_of(str)], eq=False)
"""The date format of the log message. Defaults to :code:`"%Y-%m-%d %H:%M:%S"`."""
Expand Down Expand Up @@ -545,9 +545,9 @@ class Configuration:
converter=lambda x: MetricsConfig(**x) if isinstance(x, dict) else x,
eq=False,
)
"""Metrics configuration. Defaults to
"""Metrics configuration. Defaults to
:code:`{"enabled": False, "port": 8000, "uvicorn_config": {}}`.
The key :code:`uvicorn_config` can be configured with any uvicorn config parameters.
For further information see the `uvicorn documentation <https://www.uvicorn.org/settings/>`_.
Expand Down Expand Up @@ -579,9 +579,6 @@ class Configuration:
validator=validators.instance_of(int), default=DEFAULT_MESSAGE_BACKLOG_SIZE, eq=False
)
"""Size of the error backlog. Defaults to :code:`15000`."""
component_queue_listener_implementation: str = field(
validator=validators.in_(("threading", "multiprocessing")), default="threading", eq=False
)

_getter: Getter = field(
validator=validators.instance_of(Getter),
Expand Down
4 changes: 3 additions & 1 deletion tests/unit/connector/test_opensearch_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,10 @@ def test_write_backlog_clears_message_backlog_on_failure(self):
assert len(self.object._message_backlog) == 0, "Message backlog should be cleared"

def test_write_backlog_clears_failed_on_success(self):
self.object._message_backlog = [{"some": "event"}]
self.object._failed = [{"some": "event"}]
self.object._write_backlog()
with mock.patch("logprep.connector.opensearch.output.OpensearchOutput._bulk"):
self.object._write_backlog()
assert len(self.object._failed) == 0, "temporary failed backlog should be cleared"

def test_write_backlog_clears_failed_on_failure(self):
Expand Down
Loading

0 comments on commit 6acb311

Please sign in to comment.