diff --git a/src/karapace/coordinator/schema_coordinator.py b/src/karapace/coordinator/schema_coordinator.py index 56286537b..8ba3fbe44 100644 --- a/src/karapace/coordinator/schema_coordinator.py +++ b/src/karapace/coordinator/schema_coordinator.py @@ -515,14 +515,14 @@ async def _on_join_complete( def coordinator_dead(self) -> None: """Mark the current coordinator as dead. - NOTE: this will not force a group rejoin. If new coordinator is able to - recognize this member we will just continue with current generation. + NOTE: this will force a group rejoin. """ if self._coordinator_dead_fut is not None and self.coordinator_id is not None: LOG.warning("Marking the coordinator dead (node %s)for group %s.", self.coordinator_id, self.group_id) self._are_we_master = False self.coordinator_id = None self._coordinator_dead_fut.set_result(None) + self.request_rejoin() def reset_generation(self) -> None: """Coordinator did not recognize either generation or member_id. Will diff --git a/src/schema_registry/registry.py b/src/schema_registry/registry.py index 9e42004a5..56768ca60 100644 --- a/src/schema_registry/registry.py +++ b/src/schema_registry/registry.py @@ -89,23 +89,22 @@ async def close(self) -> None: stack.enter_context(closing(self.schema_reader)) stack.enter_context(closing(self.producer)) - async def get_master(self, ignore_readiness: bool = False) -> PrimaryInfo: + async def get_master(self) -> PrimaryInfo: """Resolve if current node is the primary and the primary node address. - :param bool ignore_readiness: Ignore waiting to become ready and return - follower/primary state and primary url. :return (bool, Optional[str]): returns the primary/follower state and primary url """ async with self._master_lock: - while True: - primary_info = self.mc.get_master_info() - if not primary_info.primary and not primary_info.primary_url: - LOG.info("No master set: %r", primary_info) - elif not ignore_readiness and self.schema_reader.ready() is False: - LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready) - else: - return primary_info - await asyncio.sleep(1.0) + primary_info = self.mc.get_master_info() + if ( + # If node is not primary and no known primary url + not primary_info.primary and primary_info.primary_url is None + ): + LOG.info("No master set: %r", primary_info) + if self.schema_reader.ready() is False: + LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready) + return PrimaryInfo(False, primary_url=primary_info.primary_url) + return primary_info def get_compatibility_mode(self, subject: Subject) -> CompatibilityModes: compatibility = self.database.get_subject_compatibility(subject=subject)