Skip to content

Commit

Permalink
Merge pull request #762 from Aiven-Open/fix-thread-internal-override
Browse files Browse the repository at this point in the history
Rename internal variable to avoid overriding Thread._stop()
  • Loading branch information
eliax1996 authored Nov 20, 2023
2 parents b3cb8e8 + 5a95a85 commit e7f6be7
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def __init__(

# This event controls when the Reader should stop running, it will be
# set by another thread (e.g. `KarapaceSchemaRegistry`)
self._stop = Event()
self._stop_schema_reader = Event()

self.key_formatter = key_formatter

Expand All @@ -157,45 +157,45 @@ def __init__(

def close(self) -> None:
LOG.info("Closing schema_reader")
self._stop.set()
self._stop_schema_reader.set()

def run(self) -> None:
with ExitStack() as stack:
while not self._stop.is_set() and self.admin_client is None:
while not self._stop_schema_reader.is_set() and self.admin_client is None:
try:
self.admin_client = _create_admin_client_from_config(self.config)
except (NodeNotReadyError, NoBrokersAvailable, AssertionError):
LOG.warning("[Admin Client] No Brokers available yet. Retrying")
self._stop.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS)
self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS)
except KafkaConfigurationError:
LOG.exception("[Admin Client] Invalid configuration. Bailing")
raise
except Exception as e: # pylint: disable=broad-except
LOG.exception("[Admin Client] Unexpected exception. Retrying")
self.stats.unexpected_exception(ex=e, where="admin_client_instantiation")
self._stop.wait(timeout=2.0)
self._stop_schema_reader.wait(timeout=2.0)

assert self.admin_client is not None

while not self._stop.is_set() and self.consumer is None:
while not self._stop_schema_reader.is_set() and self.consumer is None:
try:
self.consumer = _create_consumer_from_config(self.config)
stack.enter_context(closing(self.consumer))
except (NodeNotReadyError, NoBrokersAvailable, AssertionError):
LOG.warning("[Consumer] No Brokers available yet. Retrying")
self._stop.wait(timeout=2.0)
self._stop_schema_reader.wait(timeout=2.0)
except KafkaConfigurationError:
LOG.exception("[Consumer] Invalid configuration. Bailing")
raise
except Exception as e: # pylint: disable=broad-except
LOG.exception("[Consumer] Unexpected exception. Retrying")
self.stats.unexpected_exception(ex=e, where="consumer_instantiation")
self._stop.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS)
self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS)

assert self.consumer is not None

schema_topic_exists = False
while not self._stop.is_set() and not schema_topic_exists:
while not self._stop_schema_reader.is_set() and not schema_topic_exists:
try:
LOG.info("[Schema Topic] Creating %r", self.config["topic_name"])
topic = self.admin_client.new_topic(
Expand All @@ -214,12 +214,12 @@ def run(self) -> None:
"[Schema Topic] Failed to create topic %r, not enough Kafka brokers ready yet, retrying",
topic.topic,
)
self._stop.wait(timeout=SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS)
self._stop_schema_reader.wait(timeout=SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS)
except: # pylint: disable=bare-except
LOG.exception("[Schema Topic] Failed to create %r, retrying", topic.topic)
self._stop.wait(timeout=SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS)
self._stop_schema_reader.wait(timeout=SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS)

while not self._stop.is_set():
while not self._stop_schema_reader.is_set():
if self.offset == OFFSET_UNINITIALIZED:
# Handles also a unusual case of purged schemas topic where starting offset can be > 0
# and no records to process.
Expand Down

0 comments on commit e7f6be7

Please sign in to comment.