Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix exporter restart #677

Merged
merged 7 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion examples/compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@ services:
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
- /var/run/docker.sock:/var/run/docker.sock
command: sh -c "((sleep 15 && echo 'kafka up' && kafka-topics.sh --create --if-not-exists --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 4 --topic consumer)&) && /opt/bitnami/scripts/kafka/run.sh"
command: |
sh -c
"((sleep 15 && echo 'kafka up' &&
kafka-topics.sh --create --if-not-exists --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 4 --topic consumer &&
kafka-topics.sh --create --if-not-exists --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 4 --topic errors &&
kafka-topics.sh --create --if-not-exists --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 4 --topic producer)&) &&
/opt/bitnami/scripts/kafka/run.sh"
healthcheck:
test:
[
Expand Down
6 changes: 4 additions & 2 deletions examples/exampledata/config/http_pipeline.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
version: 2
process_count: 8
config_refresh_interval: 300
process_count: 4
config_refresh_interval: 5
profile_pipelines: false
restart_count: 3
logger:
level: INFO
loggers:
Expand Down Expand Up @@ -47,6 +48,7 @@ input:
/json: json
/lab/123/(ABC|DEF)/pl.*: plaintext
/lab/123/ABC/auditlog: jsonl

output:
kafka:
type: confluentkafka_output
Expand Down
1 change: 1 addition & 0 deletions examples/exampledata/config/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ version: 1
process_count: 2
timeout: 0.1
restart_count: 2
config_refresh_interval: 5
logger:
level: INFO
format: "%(asctime)-15s %(hostname)-5s %(name)-10s %(levelname)-8s: %(message)s"
Expand Down
2 changes: 1 addition & 1 deletion logprep/connector/jsonl/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Config(Output.Config):
failed_events: list

__slots__ = [
"ast_timeout",
"last_timeout",
"events",
"failed_events",
]
Expand Down
56 changes: 40 additions & 16 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import multiprocessing.managers
import multiprocessing.queues
import random
import signal
import time

from attr import define, field
Expand Down Expand Up @@ -89,18 +88,21 @@ def __init__(self, configuration: Configuration):
self.restart_count = 0
self.restart_timeout_ms = random.randint(100, 1000)
self.metrics = self.Metrics(labels={"component": "manager"})
self.loghandler = None
self.loghandler: LogprepMPQueueListener = None
self._error_queue: multiprocessing.Queue | None = None
self._configuration: Configuration = configuration
self._pipelines: list[multiprocessing.Process] = []
self.prometheus_exporter: PrometheusExporter | None = None
if multiprocessing.current_process().name == "MainProcess":
self._set_http_input_queue(configuration)
self._setup_logging()
self._pipelines: list[multiprocessing.Process] = []
self._configuration = configuration
self._setup_prometheus_exporter()
self._set_http_input_queue()

def _setup_prometheus_exporter(self):
prometheus_config = self._configuration.metrics
if prometheus_config.enabled:
if prometheus_config.enabled and not self.prometheus_exporter:
self.prometheus_exporter = PrometheusExporter(prometheus_config)
else:
self.prometheus_exporter = None
self.prometheus_exporter.prepare_multiprocessing()

def _setup_logging(self):
console_logger = logging.getLogger("console")
Expand All @@ -109,12 +111,13 @@ def _setup_logging(self):
self.loghandler = LogprepMPQueueListener(logqueue, console_handler)
self.loghandler.start()

def _set_http_input_queue(self, configuration):
def _set_http_input_queue(self):
"""
this workaround has to be done because the queue size is not configurable
after initialization and the queue has to be shared between the multiple processes
"""
input_config = next(iter(configuration.input.values()))
input_config = list(self._configuration.input.values())
input_config = input_config[0] if input_config else {}
is_http_input = input_config.get("type") == "http_input"
if not is_http_input and HttpInput.messages is not None:
return
Expand Down Expand Up @@ -169,27 +172,39 @@ def restart_failed_pipeline(self):
self._pipelines.insert(index, self._create_pipeline(pipeline_index))
exit_code = failed_pipeline.exitcode
logger.warning(
"Restarting failed pipeline on index %s " "with exit code: %s",
"Restarting failed pipeline on index %s with exit code: %s",
pipeline_index,
exit_code,
)
if self._configuration.restart_count < 0:
return
self._wait_to_restart()

def _wait_to_restart(self):
self.restart_count += 1
time.sleep(self.restart_timeout_ms / 1000)
self.restart_timeout_ms = self.restart_timeout_ms * 2

def stop(self):
"""Stop processing any pipelines by reducing the pipeline count to zero."""
self._decrease_to_count(0)
self.set_count(0)
if self.prometheus_exporter:
self.prometheus_exporter.server.server.handle_exit(signal.SIGTERM, None)
self.prometheus_exporter.server.shut_down()
self.prometheus_exporter.cleanup_prometheus_multiprocess_dir()
logger.info("Shutdown complete")
if self.loghandler is not None:
self.loghandler.stop()

def start(self):
"""Start processing."""
self.set_count(self._configuration.process_count)

def restart(self, daemon=True):
def restart(self):
"""Restarts all pipelines"""
if self.prometheus_exporter:
self.prometheus_exporter.run(daemon=daemon)
self.stop()
self.start()

def reload(self):
self.set_count(0)
self.set_count(self._configuration.process_count)

Expand All @@ -204,3 +219,12 @@ def _create_pipeline(self, index) -> multiprocessing.Process:
process.start()
logger.info("Created new pipeline")
return process

def should_exit(self) -> bool:
"""Check if the manager should exit."""
return all(
(
self._configuration.restart_count >= 0,
self.restart_count >= self._configuration.restart_count,
)
)
7 changes: 4 additions & 3 deletions logprep/metrics/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, configuration: MetricsConfig):
self.healthcheck_functions = None
self._multiprocessing_prepared = False

def _prepare_multiprocessing(self):
def prepare_multiprocessing(self):
"""
Sets up the proper metric registry for multiprocessing and handles the necessary
temporary multiprocessing directory that the prometheus client expects.
Expand Down Expand Up @@ -93,7 +93,7 @@ def run(self, daemon=True):
return
port = self.configuration.port
self.init_server(daemon=daemon)
self._prepare_multiprocessing()
self.prepare_multiprocessing()
self.server.start()
logger.info("Prometheus Exporter started on port %s", port)

Expand All @@ -116,6 +116,7 @@ def restart(self):
def update_healthchecks(self, healthcheck_functions: Iterable[Callable], daemon=True) -> None:
"""Updates the healthcheck functions"""
self.healthcheck_functions = healthcheck_functions
self.server.shut_down()
if self.server and self.server.thread and self.server.thread.is_alive():
self.server.shut_down()
self.init_server(daemon=daemon)
self.run()
3 changes: 3 additions & 0 deletions logprep/run_logprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ def run(configs: tuple[str], version=None) -> None:
runner = Runner.get_runner(configuration)
logger.debug("Configuration loaded")
runner.start()
except SystemExit as error:
logger.error(f"Error during setup: error code {error.code}")
sys.exit(error.code)
# pylint: disable=broad-except
except Exception as error:
if os.environ.get("DEBUG", False):
Expand Down
35 changes: 13 additions & 22 deletions logprep/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# pylint: disable=logging-fstring-interpolation

import atexit
import logging
import sys
from importlib.metadata import version
Expand Down Expand Up @@ -43,8 +44,6 @@ class Runner:

"""

scheduler: Scheduler

_runner = None

_configuration: Configuration
Expand Down Expand Up @@ -130,11 +129,12 @@ def get_runner(configuration: Configuration) -> "Runner":

# For production, use the get_runner method to create/get access to a singleton!
def __init__(self, configuration: Configuration) -> None:
self._manager: PipelineManager | None = None
atexit.register(self.stop_and_exit)
self.exit_code = EXITCODES.SUCCESS
self._configuration = configuration
self.metrics = self.Metrics(labels={"logprep": "unset", "config": "unset"})
self._logger = logging.getLogger("Runner")

self._manager = PipelineManager(configuration)
self.scheduler = Scheduler()

Expand All @@ -144,46 +144,37 @@ def start(self):
This runs until an SIGTERM, SIGINT or KeyboardInterrupt signal is received, or an unhandled
error occurs.
"""

self._set_version_info_metric()
self._schedule_config_refresh_job()
self._manager.restart()
self._manager.start()
self._logger.info("Startup complete")
self._logger.debug("Runner iterating")
self._iterate()

def stop_and_exit(self):
"""Stop the runner and exit the process."""
self._logger.info("Shutting down")
self._manager.stop()
self._logger.info("Shutdown complete")
if self._manager.loghandler is not None:
self._manager.loghandler.stop()
sys.exit(self.exit_code.value)
if self._manager:
self._manager.stop()

def _iterate(self):
for _ in self._keep_iterating():
if self._exit_received:
break
self.scheduler.run_pending()
if self._should_exit():
self.exit_code = EXITCODES.PIPELINE_ERROR
if self._manager.should_exit():
self.exit_code = EXITCODES.PIPELINE_ERROR.value
self._logger.error("Restart count exceeded. Exiting.")
break
sys.exit(self.exit_code)
self._manager.restart_failed_pipeline()

def _should_exit(self):
return all(
(
self._configuration.restart_count >= 0,
self._manager.restart_count >= self._configuration.restart_count,
)
)

def reload_configuration(self):
"""Reloads the configuration"""
try:
self._configuration.reload()
self._logger.info("Successfully reloaded configuration")
self.metrics.number_of_config_refreshes += 1
self._manager.restart()
self._manager.reload()
self._schedule_config_refresh_job()
self._logger.info(f"Configuration version: {self._configuration.version}")
self._set_version_info_metric()
Expand Down
3 changes: 2 additions & 1 deletion logprep/util/http.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""logprep http utils"""

import atexit
import inspect
import json
import logging
Expand Down Expand Up @@ -52,7 +53,7 @@ def __init__(
logger_name: str
Name of the logger instance
"""

atexit.register(self.shut_down, wait=0.1)
if (
hasattr(self, "thread")
and self.thread is not None
Expand Down
3 changes: 2 additions & 1 deletion logprep/util/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ def start(self):

def stop(self):
self.enqueue_sentinel()
self._process.join()
if self._process and hasattr(self._process, "join"):
self._process.join()
self._process = None
44 changes: 37 additions & 7 deletions tests/acceptance/test_config_refresh.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# pylint: disable=missing-docstring
import tempfile
from pathlib import Path

import pytest
from ruamel.yaml import YAML

from logprep.util.configuration import Configuration
Expand All @@ -9,29 +11,57 @@
yaml = YAML(typ="safe", pure=True)


@pytest.fixture(name="config")
def get_config():
input_file = tempfile.mkstemp(suffix=".input.log")[1]

config_dict = {
"version": "1",
"process_count": 1,
"timeout": 0.1,
"profile_pipelines": False,
"config_refresh_interval": 5,
"metrics": {"enabled": False},
"pipeline": [],
"input": {
"file_input": {
"type": "file_input",
"logfile_path": input_file,
"start": "begin",
"interval": 1,
"watch_file": True,
}
},
"output": {
"jsonl_output": {
"type": "dummy_output",
}
},
}

return Configuration(**config_dict)


def teardown_function():
Path("generated_config.yml").unlink(missing_ok=True)
stop_logprep()


def test_two_times_config_refresh_after_5_seconds(tmp_path):
config = Configuration.from_sources(["tests/testdata/config/config.yml"])
config.config_refresh_interval = 5
config.metrics = {"enabled": False}
def test_two_times_config_refresh_after_5_seconds(tmp_path, config):
config_path = tmp_path / "generated_config.yml"
config_path.write_text(config.as_json())
config = Configuration.from_sources([str(config_path)])
proc = start_logprep(config_path)
wait_for_output(proc, "Config refresh interval is set to: 5 seconds", test_timeout=5)
config.version = "2"
config_path.write_text(config.as_json())
wait_for_output(proc, "Successfully reloaded configuration", test_timeout=12)
config.version = "other version"
config_path.write_text(config.as_json())
wait_for_output(proc, "Successfully reloaded configuration", test_timeout=12)
wait_for_output(proc, "Successfully reloaded configuration", test_timeout=20)


def test_no_config_refresh_after_5_seconds(tmp_path):
config = Configuration.from_sources(["tests/testdata/config/config.yml"])
def test_no_config_refresh_after_5_seconds(tmp_path, config):
config.config_refresh_interval = 5
config.metrics = {"enabled": False}
config_path = tmp_path / "generated_config.yml"
Expand Down
1 change: 1 addition & 0 deletions tests/acceptance/test_full_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def test_logprep_exposes_prometheus_metrics(tmp_path):
assert "error" not in output.lower(), "error message"
assert "critical" not in output.lower(), "error message"
assert "exception" not in output.lower(), "error message"
assert not re.search("Shutting down", output)
if "Startup complete" in output:
break
time.sleep(2)
Expand Down
Loading
Loading