From a52c255b2d37954cfff26a3bb045eae0083457c2 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Fri, 13 Sep 2024 16:40:03 +0200 Subject: [PATCH] fix kafka tests --- logprep/connector/confluent_kafka/input.py | 7 +++++++ logprep/connector/confluent_kafka/output.py | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 8f1a5c50f..0816ce08e 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -527,3 +527,10 @@ def health(self) -> bool: self.metrics.number_of_errors += 1 return False return super().health() + + def setup(self) -> None: + """Set the component up.""" + try: + super().setup() + except KafkaException as error: + raise FatalInputError(self, f"Could not setup kafka consumer: {error}") from error diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index 85cac602a..99da911fa 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -340,3 +340,10 @@ def health(self) -> bool: self.metrics.number_of_errors += 1 return False return super().health() + + def setup(self) -> None: + """Set the component up.""" + try: + super().setup() + except KafkaException as error: + raise FatalOutputError(self, f"Could not setup kafka producer: {error}") from error