Skip to content

Commit

Permalink
Fix restart of exporter on logprep start
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Sep 28, 2024
1 parent 06424be commit 718a353
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 88 deletions.
56 changes: 40 additions & 16 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import multiprocessing.managers
import multiprocessing.queues
import random
import signal
import time

from attr import define, field
Expand Down Expand Up @@ -89,18 +88,21 @@ def __init__(self, configuration: Configuration):
self.restart_count = 0
self.restart_timeout_ms = random.randint(100, 1000)
self.metrics = self.Metrics(labels={"component": "manager"})
self.loghandler = None
self.loghandler: LogprepMPQueueListener = None
self._error_queue: multiprocessing.Queue | None = None
self._configuration: Configuration = configuration
self._pipelines: list[multiprocessing.Process] = []
self.prometheus_exporter: PrometheusExporter | None = None
if multiprocessing.current_process().name == "MainProcess":
self._set_http_input_queue(configuration)
self._setup_logging()
self._pipelines: list[multiprocessing.Process] = []
self._configuration = configuration
self._setup_prometheus_exporter()
self._set_http_input_queue()

def _setup_prometheus_exporter(self):
prometheus_config = self._configuration.metrics
if prometheus_config.enabled:
if prometheus_config.enabled and not self.prometheus_exporter:
self.prometheus_exporter = PrometheusExporter(prometheus_config)
else:
self.prometheus_exporter = None
self.prometheus_exporter.prepare_multiprocessing()

def _setup_logging(self):
console_logger = logging.getLogger("console")
Expand All @@ -109,12 +111,13 @@ def _setup_logging(self):
self.loghandler = LogprepMPQueueListener(logqueue, console_handler)
self.loghandler.start()

def _set_http_input_queue(self, configuration):
def _set_http_input_queue(self):
"""
this workaround has to be done because the queue size is not configurable
after initialization and the queue has to be shared between the multiple processes
"""
input_config = next(iter(configuration.input.values()))
input_config = list(self._configuration.input.values())
input_config = input_config[0] if input_config else {}
is_http_input = input_config.get("type") == "http_input"
if not is_http_input and HttpInput.messages is not None:
return
Expand Down Expand Up @@ -169,27 +172,39 @@ def restart_failed_pipeline(self):
self._pipelines.insert(index, self._create_pipeline(pipeline_index))
exit_code = failed_pipeline.exitcode
logger.warning(
"Restarting failed pipeline on index %s " "with exit code: %s",
"Restarting failed pipeline on index %s with exit code: %s",
pipeline_index,
exit_code,
)
if self._configuration.restart_count < 0:
return
self._wait_to_restart()

def _wait_to_restart(self):
self.restart_count += 1
time.sleep(self.restart_timeout_ms / 1000)
self.restart_timeout_ms = self.restart_timeout_ms * 2

def stop(self):
"""Stop processing any pipelines by reducing the pipeline count to zero."""
self._decrease_to_count(0)
self.set_count(0)
if self.prometheus_exporter:
self.prometheus_exporter.server.server.handle_exit(signal.SIGTERM, None)
self.prometheus_exporter.server.shut_down()
self.prometheus_exporter.cleanup_prometheus_multiprocess_dir()
logger.info("Shutdown complete")
if self.loghandler is not None:
self.loghandler.stop()

def start(self):
"""Start processing."""
self.set_count(self._configuration.process_count)

def restart(self, daemon=True):
def restart(self):
"""Restarts all pipelines"""
if self.prometheus_exporter:
self.prometheus_exporter.run(daemon=daemon)
self.stop()
self.start()

def reload(self):
self.set_count(0)
self.set_count(self._configuration.process_count)

Expand All @@ -204,3 +219,12 @@ def _create_pipeline(self, index) -> multiprocessing.Process:
process.start()
logger.info("Created new pipeline")
return process

def should_exit(self) -> bool:
"""Check if the manager should exit."""
return all(
(
self._configuration.restart_count >= 0,
self.restart_count >= self._configuration.restart_count,
)
)
7 changes: 4 additions & 3 deletions logprep/metrics/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, configuration: MetricsConfig):
self.healthcheck_functions = None
self._multiprocessing_prepared = False

def _prepare_multiprocessing(self):
def prepare_multiprocessing(self):
"""
Sets up the proper metric registry for multiprocessing and handles the necessary
temporary multiprocessing directory that the prometheus client expects.
Expand Down Expand Up @@ -93,7 +93,7 @@ def run(self, daemon=True):
return
port = self.configuration.port
self.init_server(daemon=daemon)
self._prepare_multiprocessing()
self.prepare_multiprocessing()
self.server.start()
logger.info("Prometheus Exporter started on port %s", port)

Expand All @@ -116,6 +116,7 @@ def restart(self):
def update_healthchecks(self, healthcheck_functions: Iterable[Callable], daemon=True) -> None:
"""Updates the healthcheck functions"""
self.healthcheck_functions = healthcheck_functions
self.server.shut_down()
if self.server and self.server.thread and self.server.thread.is_alive():
self.server.shut_down()
self.init_server(daemon=daemon)
self.run()
3 changes: 3 additions & 0 deletions logprep/run_logprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ def run(configs: tuple[str], version=None) -> None:
runner = Runner.get_runner(configuration)
logger.debug("Configuration loaded")
runner.start()
except SystemExit as error:
logger.error(f"Error during setup: error code {error.code}")
sys.exit(error.code)
# pylint: disable=broad-except
except Exception as error:
if os.environ.get("DEBUG", False):
Expand Down
35 changes: 13 additions & 22 deletions logprep/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# pylint: disable=logging-fstring-interpolation

import atexit
import logging
import sys
from importlib.metadata import version
Expand Down Expand Up @@ -43,8 +44,6 @@ class Runner:
"""

scheduler: Scheduler

_runner = None

_configuration: Configuration
Expand Down Expand Up @@ -130,11 +129,12 @@ def get_runner(configuration: Configuration) -> "Runner":

# For production, use the get_runner method to create/get access to a singleton!
def __init__(self, configuration: Configuration) -> None:
self._manager: PipelineManager | None = None
atexit.register(self.stop_and_exit)
self.exit_code = EXITCODES.SUCCESS
self._configuration = configuration
self.metrics = self.Metrics(labels={"logprep": "unset", "config": "unset"})
self._logger = logging.getLogger("Runner")

self._manager = PipelineManager(configuration)
self.scheduler = Scheduler()

Expand All @@ -144,46 +144,37 @@ def start(self):
This runs until an SIGTERM, SIGINT or KeyboardInterrupt signal is received, or an unhandled
error occurs.
"""

self._set_version_info_metric()
self._schedule_config_refresh_job()
self._manager.restart()
self._manager.start()
self._logger.info("Startup complete")
self._logger.debug("Runner iterating")
self._iterate()

def stop_and_exit(self):
"""Stop the runner and exit the process."""
self._logger.info("Shutting down")
self._manager.stop()
self._logger.info("Shutdown complete")
if self._manager.loghandler is not None:
self._manager.loghandler.stop()
sys.exit(self.exit_code.value)
if self._manager:
self._manager.stop()

def _iterate(self):
for _ in self._keep_iterating():
if self._exit_received:
break
self.scheduler.run_pending()
if self._should_exit():
self.exit_code = EXITCODES.PIPELINE_ERROR
if self._manager.should_exit():
self.exit_code = EXITCODES.PIPELINE_ERROR.value
self._logger.error("Restart count exceeded. Exiting.")
break
sys.exit(self.exit_code)
self._manager.restart_failed_pipeline()

def _should_exit(self):
return all(
(
self._configuration.restart_count >= 0,
self._manager.restart_count >= self._configuration.restart_count,
)
)

def reload_configuration(self):
"""Reloads the configuration"""
try:
self._configuration.reload()
self._logger.info("Successfully reloaded configuration")
self.metrics.number_of_config_refreshes += 1
self._manager.restart()
self._manager.reload()
self._schedule_config_refresh_job()
self._logger.info(f"Configuration version: {self._configuration.version}")
self._set_version_info_metric()
Expand Down
44 changes: 37 additions & 7 deletions tests/acceptance/test_config_refresh.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# pylint: disable=missing-docstring
import tempfile
from pathlib import Path

import pytest
from ruamel.yaml import YAML

from logprep.util.configuration import Configuration
Expand All @@ -9,29 +11,57 @@
yaml = YAML(typ="safe", pure=True)


@pytest.fixture(name="config")
def get_config():
input_file = tempfile.mkstemp(suffix=".input.log")[1]

config_dict = {
"version": "1",
"process_count": 1,
"timeout": 0.1,
"profile_pipelines": False,
"config_refresh_interval": 5,
"metrics": {"enabled": False},
"pipeline": [],
"input": {
"file_input": {
"type": "file_input",
"logfile_path": input_file,
"start": "begin",
"interval": 1,
"watch_file": True,
}
},
"output": {
"jsonl_output": {
"type": "dummy_output",
}
},
}

return Configuration(**config_dict)


def teardown_function():
Path("generated_config.yml").unlink(missing_ok=True)
stop_logprep()


def test_two_times_config_refresh_after_5_seconds(tmp_path):
config = Configuration.from_sources(["tests/testdata/config/config.yml"])
config.config_refresh_interval = 5
config.metrics = {"enabled": False}
def test_two_times_config_refresh_after_5_seconds(tmp_path, config):
config_path = tmp_path / "generated_config.yml"
config_path.write_text(config.as_json())
config = Configuration.from_sources([str(config_path)])
proc = start_logprep(config_path)
wait_for_output(proc, "Config refresh interval is set to: 5 seconds", test_timeout=5)
config.version = "2"
config_path.write_text(config.as_json())
wait_for_output(proc, "Successfully reloaded configuration", test_timeout=12)
config.version = "other version"
config_path.write_text(config.as_json())
wait_for_output(proc, "Successfully reloaded configuration", test_timeout=12)
wait_for_output(proc, "Successfully reloaded configuration", test_timeout=20)


def test_no_config_refresh_after_5_seconds(tmp_path):
config = Configuration.from_sources(["tests/testdata/config/config.yml"])
def test_no_config_refresh_after_5_seconds(tmp_path, config):
config.config_refresh_interval = 5
config.metrics = {"enabled": False}
config_path = tmp_path / "generated_config.yml"
Expand Down
1 change: 1 addition & 0 deletions tests/acceptance/test_full_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def test_logprep_exposes_prometheus_metrics(tmp_path):
assert "error" not in output.lower(), "error message"
assert "critical" not in output.lower(), "error message"
assert "exception" not in output.lower(), "error message"
assert not re.search("Shutting down", output)
if "Startup complete" in output:
break
time.sleep(2)
Expand Down
44 changes: 25 additions & 19 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ def test_stop_calls_prometheus_cleanup_method(self, tmpdir):
def test_prometheus_exporter_is_instanciated_if_metrics_enabled(self):
config = deepcopy(self.config)
config.metrics = MetricsConfig(enabled=True, port=8000)
manager = PipelineManager(config)
with mock.patch("logprep.metrics.exporter.PrometheusExporter.prepare_multiprocessing"):
manager = PipelineManager(config)
assert isinstance(manager.prometheus_exporter, PrometheusExporter)

def test_set_count_increases_number_of_pipeline_starts_metric(self):
Expand All @@ -167,15 +168,7 @@ def test_restart_calls_set_count(self):
mock_set_count.assert_called()
assert mock_set_count.call_count == 2

def test_restart_calls_prometheus_exporter_run(self):
config = deepcopy(self.config)
config.metrics = MetricsConfig(enabled=True, port=666)
pipeline_manager = PipelineManager(config)
pipeline_manager.prometheus_exporter = mock.MagicMock()
pipeline_manager.restart()
pipeline_manager.prometheus_exporter.run.assert_called()

def test_restart_sets_deterministic_pipline_index(self):
def test_restart_sets_deterministic_pipeline_index(self):
config = deepcopy(self.config)
config.metrics = MetricsConfig(enabled=False, port=666)
pipeline_manager = PipelineManager(config)
Expand Down Expand Up @@ -207,14 +200,15 @@ def test_pipeline_manager_sets_queue_size_for_http_input(self):
},
}
}
PipelineManager(config)
PipelineManager(config).start()
assert HttpInput.messages._maxsize == 100
http_input = Factory.create(config.input)
assert http_input.messages._maxsize == 100

def test_pipeline_manager_setups_logging(self):
dictConfig(DEFAULT_LOG_CONFIG)
manager = PipelineManager(self.config)
manager.start()
assert manager.loghandler is not None
assert manager.loghandler.queue == logqueue
assert manager.loghandler._thread is None
Expand Down Expand Up @@ -262,14 +256,26 @@ def test_restart_injects_healthcheck_functions(self):
pipeline_manager.restart()
pipeline_manager.prometheus_exporter.update_healthchecks.assert_called()

def test_restart_ensures_prometheus_exporter_is_running(self):
config = deepcopy(self.config)
config.metrics = MetricsConfig(enabled=True, port=666)
pipeline_manager = PipelineManager(config)
pipeline_manager.prometheus_exporter._prepare_multiprocessing = mock.MagicMock()
with mock.patch("logprep.util.http.ThreadingHTTPServer"):
pipeline_manager.restart()
pipeline_manager.prometheus_exporter.server.start.assert_called()
def test_reload_calls_set_count_twice(self):
with mock.patch.object(self.manager, "set_count") as mock_set_count:
self.manager.reload()
# drains pipelines down to 0 and scales up to 3 afterwards
mock_set_count.assert_has_calls([mock.call(0), mock.call(3)])

def test_should_exit_returns_bool_based_on_restart_count(self):
self.config.restart_count = 2
manager = PipelineManager(self.config)
assert not manager.should_exit()
manager.restart_count = 1
assert not manager.should_exit()
manager.restart_count = 2
assert manager.should_exit()

def test_stop_calls_stop_on_loghandler(self):
manager = PipelineManager(self.config)
manager.loghandler = mock.MagicMock()
manager.stop()
manager.loghandler.stop.assert_called()


class TestThrottlingQueue:
Expand Down
Loading

0 comments on commit 718a353

Please sign in to comment.