Skip to content

Commit

Permalink
Merge branch 'main' into avro_references2
Browse files Browse the repository at this point in the history
  • Loading branch information
libretto committed Jan 14, 2025
2 parents 22432f7 + 2da698f commit 3b92077
Show file tree
Hide file tree
Showing 19 changed files with 414 additions and 92 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/container.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
run: |
echo is_release=${{ contains(github.ref, 'refs/tags/') }} | tee -a $GITHUB_OUTPUT
echo is_dev=${{ ! contains(github.ref, 'refs/tags/') }} | tee -a $GITHUB_OUTPUT
echo version=$(git describe --always --tags) | tee -a $GITHUB_OUTPUT
echo version=$(git describe --tags | cut -d '-' -f -2 | sed 's/-/.dev/g') | tee -a $GITHUB_OUTPUT
# QEMU is used to set up VMs for building non-x86_64 images.
- name: Set up QEMU
Expand Down
5 changes: 5 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,11 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
* - ``log_format``
- ``%(name)-20s\t%(threadName)s\t%(levelname)-8s\t%(message)s``
- Log format
* - ``waiting_time_before_acting_as_master_ms``
- ``5000``
- The time that a master wait before becoming an active master if at the previous round of election wasn't the master (in that case the waiting time its skipped).
Should be an upper bound of the time required for a master to write a message in the kafka topic + the time required from a node in the cluster to consume the
Log of messages. If the value its too low there is the risk under high load of producing different schemas with the ID.


Authentication and authorization of Karapace Schema Registry REST API
Expand Down
11 changes: 6 additions & 5 deletions container/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ RUN --mount=type=cache,target=/root/.cache/pip \
FROM python:3.10.11-slim-bullseye AS karapace

# Setup user and directories.
RUN groupadd --system karapace \
&& useradd --system --gid karapace karapace \
# https://docs.redhat.com/en/documentation/openshift_container_platform/4.17/html/images/creating-images#use-uid_create-images
RUN useradd --system --gid 0 karapace \
&& mkdir /opt/karapace /opt/karapace/runtime /var/log/karapace \
&& chown --recursive karapace:karapace /opt/karapace /var/log/karapace
&& chgrp -R 0 /opt/karapace /opt/karapace/runtime /var/log/karapace \
&& chmod -R g+rwX /opt/karapace

# Install protobuf compiler.
ARG PROTOBUF_COMPILER_VERSION="3.12.4-1+deb11u1"
Expand All @@ -56,8 +57,8 @@ COPY --from=builder /venv /venv
ENV PATH="/venv/bin:$PATH"

COPY ./container/start.sh /opt/karapace
RUN chmod 500 /opt/karapace/start.sh \
&& chown karapace:karapace /opt/karapace/start.sh
RUN chmod 550 /opt/karapace/start.sh \
&& chgrp -R 0 /opt/karapace/start.sh

COPY ./container/healthcheck.py /opt/karapace

Expand Down
8 changes: 4 additions & 4 deletions go/protopace/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ require (
go.opentelemetry.io/otel v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240823204242-4ba0660f739c // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240823204242-4ba0660f739c // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
Expand Down
16 changes: 8 additions & 8 deletions go/protopace/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,18 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 h1:kx6Ds3MlpiUHKj7syVnbp57++8WpuKPcR5yjLBjvLEA=
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ=
golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0=
golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
google.golang.org/genproto/googleapis/api v0.0.0-20240823204242-4ba0660f739c h1:e0zB268kOca6FbuJkYUGxfwG4DKFZG/8DLyv9Zv66cE=
google.golang.org/genproto/googleapis/api v0.0.0-20240823204242-4ba0660f739c/go.mod h1:fO8wJzT2zbQbAjbIoos1285VfEIYKDDY+Dt+WpTkh6g=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240823204242-4ba0660f739c h1:Kqjm4WpoWvwhMPcrAczoTyMySQmYa9Wy2iL6Con4zn8=
Expand Down
2 changes: 2 additions & 0 deletions src/karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class Config(TypedDict):
kafka_schema_reader_strict_mode: bool
kafka_retriable_errors_silenced: bool
use_protobuf_formatter: bool
waiting_time_before_acting_as_master_ms: int

sentry: NotRequired[Mapping[str, object]]
tags: NotRequired[Mapping[str, object]]
Expand Down Expand Up @@ -163,6 +164,7 @@ class ConfigDefaults(Config, total=False):
"kafka_schema_reader_strict_mode": False,
"kafka_retriable_errors_silenced": True,
"use_protobuf_formatter": False,
"waiting_time_before_acting_as_master_ms": 5000,
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]

Expand Down
83 changes: 68 additions & 15 deletions src/karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,42 @@
from karapace.config import Config
from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from karapace.typing import SchemaReaderStoppper
from threading import Thread
from typing import Final

import asyncio
import logging
import time

__all__ = ("MasterCoordinator",)


LOG = logging.getLogger(__name__)


class MasterCoordinator:
"""Handles primary election"""
"""Handles primary election
The coordination is run in own dedicated thread, under stress situation the main
eventloop could have queue of items to work and having own thread will give more
runtime for the coordination tasks as Python intrepreter will switch the active
thread by the configured thread switch interval. Default interval in CPython is
5 milliseconds.
"""

def __init__(self, config: Config) -> None:
super().__init__()
self._config: Final = config
self._kafka_client: AIOKafkaClient | None = None
self._running = True
self._sc: SchemaCoordinator | None = None
self._closing = asyncio.Event()
self._thread: Thread = Thread(target=self._start_loop, daemon=True)
self._loop: asyncio.AbstractEventLoop | None = None
self._schema_reader_stopper: SchemaReaderStoppper | None = None

def set_stoppper(self, schema_reader_stopper: SchemaReaderStoppper) -> None:
self._schema_reader_stopper = schema_reader_stopper

@property
def schema_coordinator(self) -> SchemaCoordinator | None:
Expand All @@ -41,7 +58,18 @@ def schema_coordinator(self) -> SchemaCoordinator | None:
def config(self) -> Config:
return self._config

async def start(self) -> None:
def start(self) -> None:
self._thread.start()

def _start_loop(self) -> None:
# we should avoid the reassignment otherwise we leak resources
assert self._loop is None, "Loop already started"
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._loop.create_task(self._async_loop())
self._loop.run_forever()

async def _async_loop(self) -> None:
self._kafka_client = self.init_kafka_client()
# Wait until schema coordinator is ready.
# This probably needs better synchronization than plain waits.
Expand All @@ -61,10 +89,22 @@ async def start(self) -> None:
await asyncio.sleep(0.5)

self._sc = self.init_schema_coordinator()
while True:
if self._sc.ready():
return
await asyncio.sleep(0.5)

# keeping the thread sleeping until it die.
# we need to keep the schema_coordinator running
# it contains the `heartbeat` and coordination logic.
await self._closing.wait()

LOG.info("Closing master_coordinator")
if self._sc:
await self._sc.close()
while self._loop is not None and not self._loop.is_closed():
self._loop.stop()
if not self._loop.is_running():
self._loop.close()
time.sleep(0.5)
if self._kafka_client:
await self._kafka_client.close()

def init_kafka_client(self) -> AIOKafkaClient:
ssl_context = create_ssl_context(
Expand All @@ -90,15 +130,18 @@ def init_kafka_client(self) -> AIOKafkaClient:

def init_schema_coordinator(self) -> SchemaCoordinator:
assert self._kafka_client is not None
assert self._schema_reader_stopper is not None
schema_coordinator = SchemaCoordinator(
client=self._kafka_client,
schema_reader_stopper=self._schema_reader_stopper,
election_strategy=self._config.get("master_election_strategy", "lowest"),
group_id=self._config["group_id"],
hostname=self._config["advertised_hostname"],
master_eligibility=self._config["master_eligibility"],
port=self._config["advertised_port"],
scheme=self._config["advertised_protocol"],
session_timeout_ms=self._config["session_timeout_ms"],
waiting_time_before_acting_as_master_ms=self._config["waiting_time_before_acting_as_master_ms"],
)
schema_coordinator.start()
return schema_coordinator
Expand All @@ -107,7 +150,7 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus:
assert self._sc is not None
generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID
return SchemaCoordinatorStatus(
is_primary=self._sc.are_we_master if self._sc is not None else None,
is_primary=self._sc.are_we_master() if self._sc is not None else None,
is_primary_eligible=self._config["master_eligibility"],
primary_url=self._sc.master_url if self._sc is not None else None,
is_running=True,
Expand All @@ -116,12 +159,22 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus:

def get_master_info(self) -> tuple[bool | None, str | None]:
"""Return whether we're the master, and the actual master url that can be used if we're not"""
assert self._sc is not None
return self._sc.are_we_master, self._sc.master_url
if not self._sc:
return False, None

if not self._sc.ready():
# we should wait for a while after we have been elected master, we should also consume
# all the messages in the log before proceeding, check the doc of `self._sc.are_we_master`
# for more details
return False, None

return self._sc.are_we_master(), self._sc.master_url

def __send_close_event(self) -> None:
self._closing.set()

async def close(self) -> None:
LOG.info("Closing master_coordinator")
if self._sc:
await self._sc.close()
if self._kafka_client:
await self._kafka_client.close()
LOG.info("Sending the close signal to the master coordinator thread")
if self._loop is None:
raise ValueError("Cannot stop the loop before `.start()` is called")
self._loop.call_soon_threadsafe(self.__send_close_event)
Loading

0 comments on commit 3b92077

Please sign in to comment.