diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 59579ec3c..9b25b3ad4 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -146,7 +146,7 @@ def __init__( # This event controls when the Reader should stop running, it will be # set by another thread (e.g. `KarapaceSchemaRegistry`) - self._stop = Event() + self._stop_schema_reader = Event() self.key_formatter = key_formatter @@ -157,45 +157,45 @@ def __init__( def close(self) -> None: LOG.info("Closing schema_reader") - self._stop.set() + self._stop_schema_reader.set() def run(self) -> None: with ExitStack() as stack: - while not self._stop.is_set() and self.admin_client is None: + while not self._stop_schema_reader.is_set() and self.admin_client is None: try: self.admin_client = _create_admin_client_from_config(self.config) except (NodeNotReadyError, NoBrokersAvailable, AssertionError): LOG.warning("[Admin Client] No Brokers available yet. Retrying") - self._stop.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS) + self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS) except KafkaConfigurationError: LOG.exception("[Admin Client] Invalid configuration. Bailing") raise except Exception as e: # pylint: disable=broad-except LOG.exception("[Admin Client] Unexpected exception. Retrying") self.stats.unexpected_exception(ex=e, where="admin_client_instantiation") - self._stop.wait(timeout=2.0) + self._stop_schema_reader.wait(timeout=2.0) assert self.admin_client is not None - while not self._stop.is_set() and self.consumer is None: + while not self._stop_schema_reader.is_set() and self.consumer is None: try: self.consumer = _create_consumer_from_config(self.config) stack.enter_context(closing(self.consumer)) except (NodeNotReadyError, NoBrokersAvailable, AssertionError): LOG.warning("[Consumer] No Brokers available yet. Retrying") - self._stop.wait(timeout=2.0) + self._stop_schema_reader.wait(timeout=2.0) except KafkaConfigurationError: LOG.exception("[Consumer] Invalid configuration. Bailing") raise except Exception as e: # pylint: disable=broad-except LOG.exception("[Consumer] Unexpected exception. Retrying") self.stats.unexpected_exception(ex=e, where="consumer_instantiation") - self._stop.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS) + self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS) assert self.consumer is not None schema_topic_exists = False - while not self._stop.is_set() and not schema_topic_exists: + while not self._stop_schema_reader.is_set() and not schema_topic_exists: try: LOG.info("[Schema Topic] Creating %r", self.config["topic_name"]) topic = self.admin_client.new_topic( @@ -214,12 +214,12 @@ def run(self) -> None: "[Schema Topic] Failed to create topic %r, not enough Kafka brokers ready yet, retrying", topic.topic, ) - self._stop.wait(timeout=SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS) + self._stop_schema_reader.wait(timeout=SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS) except: # pylint: disable=bare-except LOG.exception("[Schema Topic] Failed to create %r, retrying", topic.topic) - self._stop.wait(timeout=SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS) + self._stop_schema_reader.wait(timeout=SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS) - while not self._stop.is_set(): + while not self._stop_schema_reader.is_set(): if self.offset == OFFSET_UNINITIALIZED: # Handles also a unusual case of purged schemas topic where starting offset can be > 0 # and no records to process.