diff --git a/karapace/coordinator/schema_coordinator.py b/karapace/coordinator/schema_coordinator.py index 852c63e8f..c36e60c9e 100644 --- a/karapace/coordinator/schema_coordinator.py +++ b/karapace/coordinator/schema_coordinator.py @@ -191,6 +191,9 @@ def __init__( self._metadata_snapshot: list[Assignment] = [] + def is_master_assigned_to_myself(self) -> bool: + return self._are_we_master or False + def are_we_master(self) -> bool | None: """ After a new election its made we should wait for a while since the previous master could have produced @@ -489,7 +492,8 @@ async def _on_join_complete( else: LOG.info( "Starting immediately serving requests since I was master less than %s milliseconds ago, " - "no other masters could have written a new schema meanwhile" + "no other masters could have written a new schema meanwhile", + self._waiting_time_before_acting_as_master_ms, ) elif not member_identity["master_eligibility"]: self.master_url = None diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index d3b90dac6..b2df5bcfb 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -200,6 +200,13 @@ def _add_schema_registry_routes(self) -> None: schema_request=True, auth=self._auth, ) + self.route( + "/master_available", + callback=self.master_available, + method="POST", # post because they are not cached + schema_request=True, + # auth=self._auth, + ) self.route( "/config", callback=self.config_set, @@ -715,6 +722,24 @@ async def config_subject_delete( self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type) + async def master_available(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None: + _ = user + # self._check_authorization(user, Operation.Read, "Config:") + + are_we_master, master_url = await self.schema_registry.get_master() + + if ( + self.schema_registry.schema_reader.master_coordinator._sc is not None # pylint: disable=protected-access + and self.schema_registry.schema_reader.master_coordinator._sc.is_master_assigned_to_myself() # pylint: disable=protected-access + ): + self.r({"master_available": are_we_master}, content_type) + + if master_url is None: + self.r({"master_available": False}, content_type) + else: + url = f"{master_url}/master_available" + await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="POST") + async def subjects_list(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None: deleted = request.query.get("deleted", "false").lower() == "true" subjects = self.schema_registry.database.find_subjects(include_deleted=deleted) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 6ed4f4b22..862b99fef 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -31,7 +31,7 @@ from tests.integration.utils.process import stop_process, wait_for_port_subprocess from tests.integration.utils.synchronization import lock_path_for from tests.integration.utils.zookeeper import configure_and_start_zk -from tests.utils import repeat_until_successful_request +from tests.utils import repeat_until_master_is_available, repeat_until_successful_request from typing import AsyncIterator, Iterator, List, Optional from urllib.parse import urlparse @@ -550,6 +550,7 @@ async def fixture_registry_async_client( timeout=10, sleep=0.3, ) + await repeat_until_master_is_available(client) yield client finally: await client.close() diff --git a/tests/utils.py b/tests/utils.py index f38097858..2c5dd4ebf 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -291,6 +291,22 @@ async def repeat_until_successful_request( return res +async def repeat_until_master_is_available(client: Client) -> None: + while True: + res = await repeat_until_successful_request( + client.post, + "master_available", + json_data={}, + headers=None, + error_msg=f"Registry API {client.server_uri} is unreachable", + timeout=10, + sleep=1, + ) + reply = res.json() + if reply["master_available"] is True: + break + + def write_ini(file_path: Path, ini_data: dict) -> None: ini_contents = (f"{key}={value}" for key, value in ini_data.items()) file_contents = "\n".join(ini_contents)