Skip to content

Commit

Permalink
WIP: fix tests adding a wait for the master to become available befor…
Browse files Browse the repository at this point in the history
…e forwarding the requests
  • Loading branch information
eliax1996 committed Aug 22, 2024
1 parent 054efb7 commit e132a1c
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 2 deletions.
6 changes: 5 additions & 1 deletion karapace/coordinator/schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ def __init__(

self._metadata_snapshot: list[Assignment] = []

def is_master_assigned_to_myself(self) -> bool:
return self._are_we_master or False

def are_we_master(self) -> bool | None:
"""
After a new election its made we should wait for a while since the previous master could have produced
Expand Down Expand Up @@ -489,7 +492,8 @@ async def _on_join_complete(
else:
LOG.info(
"Starting immediately serving requests since I was master less than %s milliseconds ago, "
"no other masters could have written a new schema meanwhile"
"no other masters could have written a new schema meanwhile",
self._waiting_time_before_acting_as_master_ms,
)
elif not member_identity["master_eligibility"]:
self.master_url = None
Expand Down
25 changes: 25 additions & 0 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ def _add_schema_registry_routes(self) -> None:
schema_request=True,
auth=self._auth,
)
self.route(
"/master_available",
callback=self.master_available,
method="POST", # post because they are not cached
schema_request=True,
# auth=self._auth,
)
self.route(
"/config",
callback=self.config_set,
Expand Down Expand Up @@ -715,6 +722,24 @@ async def config_subject_delete(

self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type)

async def master_available(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None:
_ = user
# self._check_authorization(user, Operation.Read, "Config:")

are_we_master, master_url = await self.schema_registry.get_master()

if (
self.schema_registry.schema_reader.master_coordinator._sc is not None # pylint: disable=protected-access
and self.schema_registry.schema_reader.master_coordinator._sc.is_master_assigned_to_myself() # pylint: disable=protected-access
):
self.r({"master_available": are_we_master}, content_type)

if master_url is None:
self.r({"master_available": False}, content_type)
else:
url = f"{master_url}/master_available"
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="POST")

async def subjects_list(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None:
deleted = request.query.get("deleted", "false").lower() == "true"
subjects = self.schema_registry.database.find_subjects(include_deleted=deleted)
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from tests.integration.utils.process import stop_process, wait_for_port_subprocess
from tests.integration.utils.synchronization import lock_path_for
from tests.integration.utils.zookeeper import configure_and_start_zk
from tests.utils import repeat_until_successful_request
from tests.utils import repeat_until_master_is_available, repeat_until_successful_request
from typing import AsyncIterator, Iterator, List, Optional
from urllib.parse import urlparse

Expand Down Expand Up @@ -550,6 +550,7 @@ async def fixture_registry_async_client(
timeout=10,
sleep=0.3,
)
await repeat_until_master_is_available(client)
yield client
finally:
await client.close()
Expand Down
16 changes: 16 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,22 @@ async def repeat_until_successful_request(
return res


async def repeat_until_master_is_available(client: Client) -> None:
while True:
res = await repeat_until_successful_request(
client.post,
"master_available",
json_data={},
headers=None,
error_msg=f"Registry API {client.server_uri} is unreachable",
timeout=10,
sleep=1,
)
reply = res.json()
if reply["master_available"] is True:
break


def write_ini(file_path: Path, ini_data: dict) -> None:
ini_contents = (f"{key}={value}" for key, value in ini_data.items())
file_contents = "\n".join(ini_contents)
Expand Down

0 comments on commit e132a1c

Please sign in to comment.