Skip to content

Commit

Permalink
WIP: fix tests adding a wait for the master to become available befor…
Browse files Browse the repository at this point in the history
…e forwarding the requests
  • Loading branch information
eliax1996 committed Aug 22, 2024
1 parent 054efb7 commit ca1c8e8
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 6 deletions.
8 changes: 6 additions & 2 deletions karapace/coordinator/schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ def __init__(

self._metadata_snapshot: list[Assignment] = []

def is_master_assigned_to_myself(self) -> bool:
return self._are_we_master or False

def are_we_master(self) -> bool | None:
"""
After a new election its made we should wait for a while since the previous master could have produced
Expand Down Expand Up @@ -484,12 +487,13 @@ async def _on_join_complete(
LOG.info(
"Declaring myself as not master for %s milliseconds, "
"another master meanwhile could have added other records",
self._waiting_time_before_acting_as_master_ms,
self._waiting_time_before_acting_as_master_ms
)
else:
LOG.info(
"Starting immediately serving requests since I was master less than %s milliseconds ago, "
"no other masters could have written a new schema meanwhile"
"no other masters could have written a new schema meanwhile",
self._waiting_time_before_acting_as_master_ms
)
elif not member_identity["master_eligibility"]:
self.master_url = None
Expand Down
25 changes: 25 additions & 0 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ def _add_schema_registry_routes(self) -> None:
schema_request=True,
auth=self._auth,
)
self.route(
"/master_available",
callback=self.master_available,
method="POST", # post because they are not cached
schema_request=True,
# auth=self._auth,
)
self.route(
"/config",
callback=self.config_set,
Expand Down Expand Up @@ -715,6 +722,24 @@ async def config_subject_delete(

self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type)

async def master_available(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None:
_ = user
# self._check_authorization(user, Operation.Read, "Config:")

are_we_master, master_url = await self.schema_registry.get_master()

if (
self.schema_registry.schema_reader.master_coordinator._sc is not None # pylint: disable=protected-access
and self.schema_registry.schema_reader.master_coordinator._sc.is_master_assigned_to_myself() # pylint: disable=protected-access
):
self.r({"master_available": are_we_master}, content_type)

if master_url is None:
self.r({"master_available": False}, content_type)
else:
url = f"{master_url}/master_available"
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="POST")

async def subjects_list(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None:
deleted = request.query.get("deleted", "false").lower() == "true"
subjects = self.schema_registry.database.find_subjects(include_deleted=deleted)
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from tests.integration.utils.process import stop_process, wait_for_port_subprocess
from tests.integration.utils.synchronization import lock_path_for
from tests.integration.utils.zookeeper import configure_and_start_zk
from tests.utils import repeat_until_successful_request
from tests.utils import repeat_until_master_is_available, repeat_until_successful_request
from typing import AsyncIterator, Iterator, List, Optional
from urllib.parse import urlparse

Expand Down Expand Up @@ -550,6 +550,7 @@ async def fixture_registry_async_client(
timeout=10,
sleep=0.3,
)
await repeat_until_master_is_available(client)
yield client
finally:
await client.close()
Expand Down
7 changes: 4 additions & 3 deletions tests/integration/test_schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,10 @@ async def test_coordinator_workflow(

now = time.time()
while time.time() - now < waiting_time_before_acting_as_master_sec:
assert not secondary.are_we_master(), (
f"Cannot become master before {waiting_time_before_acting_as_master_sec} seconds "
f"for the late records that can arrive from the previous master"
assert (
not secondary.are_we_master()
, (f"Cannot become master before {waiting_time_before_acting_as_master_sec} seconds "
f"for the late records that can arrive from the previous master")
)
await asyncio.sleep(0.5)

Expand Down
16 changes: 16 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,22 @@ async def repeat_until_successful_request(
return res


async def repeat_until_master_is_available(client: Client) -> None:
while True:
res = await repeat_until_successful_request(
client.post,
"master_available",
json_data={},
headers=None,
error_msg=f"Registry API {client.server_uri} is unreachable",
timeout=10,
sleep=1,
)
reply = res.json()
if reply["master_available"] is True:
break


def write_ini(file_path: Path, ini_data: dict) -> None:
ini_contents = (f"{key}={value}" for key, value in ini_data.items())
file_contents = "\n".join(ini_contents)
Expand Down

0 comments on commit ca1c8e8

Please sign in to comment.