Skip to content

Commit

Permalink
fix: rejoin always when dropped from coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Jan 13, 2025
1 parent c196b64 commit 2259716
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/karapace/coordinator/schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,14 +515,14 @@ async def _on_join_complete(

def coordinator_dead(self) -> None:
"""Mark the current coordinator as dead.
NOTE: this will not force a group rejoin. If new coordinator is able to
recognize this member we will just continue with current generation.
NOTE: this will force a group rejoin.
"""
if self._coordinator_dead_fut is not None and self.coordinator_id is not None:
LOG.warning("Marking the coordinator dead (node %s)for group %s.", self.coordinator_id, self.group_id)
self._are_we_master = False
self.coordinator_id = None
self._coordinator_dead_fut.set_result(None)
self.request_rejoin()

def reset_generation(self) -> None:
"""Coordinator did not recognize either generation or member_id. Will
Expand Down
23 changes: 11 additions & 12 deletions src/schema_registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,22 @@ async def close(self) -> None:
stack.enter_context(closing(self.schema_reader))
stack.enter_context(closing(self.producer))

async def get_master(self, ignore_readiness: bool = False) -> PrimaryInfo:
async def get_master(self) -> PrimaryInfo:
"""Resolve if current node is the primary and the primary node address.
:param bool ignore_readiness: Ignore waiting to become ready and return
follower/primary state and primary url.
:return (bool, Optional[str]): returns the primary/follower state and primary url
"""
async with self._master_lock:
while True:
primary_info = self.mc.get_master_info()
if not primary_info.primary and not primary_info.primary_url:
LOG.info("No master set: %r", primary_info)
elif not ignore_readiness and self.schema_reader.ready() is False:
LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready)
else:
return primary_info
await asyncio.sleep(1.0)
primary_info = self.mc.get_master_info()
if (
# If node is not primary and no known primary url
not primary_info.primary and primary_info.primary_url is None
):
LOG.info("No master set: %r", primary_info)
if self.schema_reader.ready() is False:
LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready)
return PrimaryInfo(False, primary_url=primary_info.primary_url)
return primary_info

def get_compatibility_mode(self, subject: Subject) -> CompatibilityModes:
compatibility = self.database.get_subject_compatibility(subject=subject)
Expand Down

0 comments on commit 2259716

Please sign in to comment.