Skip to content

Commit

Permalink
Added support to other naming strategy, refactored different unrelate…
Browse files Browse the repository at this point in the history
…d stuff and added a couple of tests
  • Loading branch information
eliax1996 committed Nov 3, 2023
1 parent 101d69e commit 3471482
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 97 deletions.
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
- ``runtime``
- Runtime directory for the ``protoc`` protobuf schema parser and code generator
* - ``name_strategy``
- ``topic_name``
- ``topic_name``, ``record_name``, ``topic_record_name``
- Name strategy to use when storing schemas from the kafka rest proxy service
* - ``name_strategy_validation``
- ``true``
Expand Down
20 changes: 17 additions & 3 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,19 +158,31 @@ class InvalidConfiguration(Exception):
pass


class StrEnum(str, Enum):
def __str__(self) -> str:
return str(self.value)


@unique
class ElectionStrategy(Enum):
highest = "highest"
lowest = "lowest"


@unique
class NameStrategy(Enum):
class NameStrategy(StrEnum):
topic_name = "topic_name"
record_name = "record_name"
topic_record_name = "topic_record_name"


@unique
class SubjectType(StrEnum):
key = "key"
value = "value"
partition = "partition"


def parse_env_value(value: str) -> str | int | bool:
# we only have ints, strings and bools in the config
try:
Expand Down Expand Up @@ -273,8 +285,10 @@ def validate_config(config: Config) -> None:
try:
NameStrategy(name_strategy)
except ValueError:
valid_strategies = [strategy.value for strategy in NameStrategy]
raise InvalidConfiguration(f"Invalid name strategy: {name_strategy}, valid values are {valid_strategies}") from None
valid_strategies = list(NameStrategy)
raise InvalidConfiguration(
f"Invalid default name strategy: {name_strategy}, valid values are {valid_strategies}"
) from None

if config["rest_authorization"] and config["sasl_bootstrap_uri"] is None:
raise InvalidConfiguration(
Expand Down
58 changes: 37 additions & 21 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
TopicAuthorizationFailedError,
UnknownTopicOrPartitionError,
)
from karapace.config import Config, create_client_ssl_context
from karapace.config import Config, create_client_ssl_context, NameStrategy, SubjectType
from karapace.errors import InvalidSchema
from karapace.kafka_rest_apis.admin import KafkaRestAdminClient
from karapace.kafka_rest_apis.authentication import (
Expand All @@ -28,7 +28,13 @@
from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE
from karapace.schema_models import TypedSchema, ValidatedTypedSchema
from karapace.schema_type import SchemaType
from karapace.serialization import InvalidMessageSchema, InvalidPayload, SchemaRegistrySerializer, SchemaRetrievalError
from karapace.serialization import (
get_subject_name,
InvalidMessageSchema,
InvalidPayload,
SchemaRegistrySerializer,
SchemaRetrievalError,
)
from karapace.typing import SchemaId, Subject
from karapace.utils import convert_to_int, json_encode, KarapaceKafkaClient
from typing import Callable, Dict, List, Optional, Tuple, Union
Expand All @@ -39,7 +45,7 @@
import logging
import time

RECORD_KEYS = ["key", "value", "partition"]
SUBJECT_VALID_POSTFIX = [SubjectType.key, SubjectType.value]
PUBLISH_KEYS = {"records", "value_schema", "value_schema_id", "key_schema", "key_schema_id"}
RECORD_CODES = [42201, 42202]
KNOWN_FORMATS = {"json", "avro", "protobuf", "binary"}
Expand Down Expand Up @@ -439,6 +445,7 @@ def __init__(

self._async_producer_lock = asyncio.Lock()
self._async_producer: Optional[AIOKafkaProducer] = None
self.naming_strategy = NameStrategy(self.config["name_strategy"])

def __str__(self) -> str:
return f"UserRestProxy(username={self.config['sasl_plain_username']})"
Expand Down Expand Up @@ -759,7 +766,7 @@ async def get_schema_id(
self,
data: dict,
topic: str,
prefix: str,
subject_type: SubjectType,
schema_type: SchemaType,
) -> SchemaId:
"""
Expand All @@ -770,21 +777,27 @@ async def get_schema_id(
"""
log.debug("[resolve schema id] Retrieving schema id for %r", data)
schema_id: Union[SchemaId, None] = (
SchemaId(int(data[f"{prefix}_schema_id"])) if f"{prefix}_schema_id" in data else None
SchemaId(int(data[f"{subject_type}_schema_id"])) if f"{subject_type}_schema_id" in data else None
)
schema_str = data.get(f"{prefix}_schema")
schema_str = data.get(f"{subject_type}_schema")

if schema_id is None and schema_str is None:
raise InvalidSchema()

if schema_id is None:
parsed_schema = ValidatedTypedSchema.parse(schema_type, schema_str)
subject_name = self.serializer.get_subject_name(topic, parsed_schema, prefix, schema_type)

subject_name = get_subject_name(
topic,
parsed_schema,
subject_type,
self.naming_strategy,
)
schema_id = await self._query_schema_id_from_cache_or_registry(parsed_schema, schema_str, subject_name)
else:

def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool:
subject = self.serializer.get_subject_name(topic, schema, prefix, schema_type)
subject = get_subject_name(topic, schema, subject_type, self.naming_strategy)
return subject not in subjects

parsed_schema, valid_subjects = await self._query_schema_and_subjects(
Expand Down Expand Up @@ -833,7 +846,9 @@ async def _query_schema_id_from_cache_or_registry(
)
return schema_id

async def validate_schema_info(self, data: dict, prefix: str, content_type: str, topic: str, schema_type: str):
async def validate_schema_info(
self, data: dict, subject_type: SubjectType, content_type: str, topic: str, schema_type: str
):
try:
schema_type = SCHEMA_MAPPINGS[schema_type]
except KeyError:
Expand All @@ -848,7 +863,7 @@ async def validate_schema_info(self, data: dict, prefix: str, content_type: str,

# will do in place updates of id keys, since calling these twice would be expensive
try:
data[f"{prefix}_schema_id"] = await self.get_schema_id(data, topic, prefix, schema_type)
data[f"{subject_type}_schema_id"] = await self.get_schema_id(data, topic, subject_type, schema_type)
except InvalidPayload:
log.exception("Unable to retrieve schema id")
KafkaRest.r(
Expand All @@ -863,16 +878,17 @@ async def validate_schema_info(self, data: dict, prefix: str, content_type: str,
KafkaRest.r(
body={
"error_code": RESTErrorCodes.SCHEMA_RETRIEVAL_ERROR.value,
"message": f"Error when registering schema. format = {schema_type.value}, subject = {topic}-{prefix}",
"message": f"Error when registering schema."
f"format = {schema_type.value}, subject = {topic}-{subject_type}",
},
content_type=content_type,
status=HTTPStatus.REQUEST_TIMEOUT,
)
except InvalidSchema:
if f"{prefix}_schema" in data:
err = f'schema = {data[f"{prefix}_schema"]}'
if f"{subject_type}_schema" in data:
err = f'schema = {data[f"{subject_type}_schema"]}'
else:
err = f'schema_id = {data[f"{prefix}_schema_id"]}'
err = f'schema_id = {data[f"{subject_type}_schema_id"]}'
KafkaRest.r(
body={
"error_code": RESTErrorCodes.INVALID_DATA.value,
Expand Down Expand Up @@ -1002,26 +1018,26 @@ async def validate_publish_request_format(self, data: dict, formats: dict, conte
status=HTTPStatus.BAD_REQUEST,
)
convert_to_int(r, "partition", content_type)
if set(r.keys()).difference(RECORD_KEYS):
if set(r.keys()).difference({subject_type.value for subject_type in SubjectType}):
KafkaRest.unprocessable_entity(
message="Invalid request format",
content_type=content_type,
sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
)
# disallow missing id and schema for any key/value list that has at least one populated element
if formats["embedded_format"] in {"avro", "jsonschema", "protobuf"}:
for prefix, code in zip(RECORD_KEYS, RECORD_CODES):
if self.all_empty(data, prefix):
for subject_type, code in zip(SUBJECT_VALID_POSTFIX, RECORD_CODES):
if self.all_empty(data, subject_type):
continue
if not self.is_valid_schema_request(data, prefix):
if not self.is_valid_schema_request(data, subject_type):
KafkaRest.unprocessable_entity(
message=f"Request includes {prefix}s and uses a format that requires schemas "
f"but does not include the {prefix}_schema or {prefix}_schema_id fields",
message=f"Request includes {subject_type}s and uses a format that requires schemas "
f"but does not include the {subject_type}_schema or {subject_type.value}_schema_id fields",
content_type=content_type,
sub_code=code,
)
try:
await self.validate_schema_info(data, prefix, content_type, topic, formats["embedded_format"])
await self.validate_schema_info(data, subject_type, content_type, topic, formats["embedded_format"])
except InvalidMessageSchema as e:
KafkaRest.unprocessable_entity(
message=str(e),
Expand Down
35 changes: 27 additions & 8 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from avro.schema import Schema as AvroSchema
from contextlib import closing, ExitStack
from enum import Enum
from jsonschema.validators import Draft7Validator
from kafka import KafkaConsumer, TopicPartition
from kafka.admin import KafkaAdminClient, NewTopic
Expand Down Expand Up @@ -58,6 +59,13 @@
METRIC_SUBJECT_DATA_SCHEMA_VERSIONS_GAUGE: Final = "karapace_schema_reader_subject_data_schema_versions"


class MessageType(Enum):
config = "CONFIG"
schema = "SCHEMA"
delete_subject = "DELETE_SUBJECT"
no_operation = "NOOP"


def _create_consumer_from_config(config: Config) -> KafkaConsumer:
# Group not set on purpose, all consumers read the same data
session_timeout_ms = config["session_timeout_ms"]
Expand Down Expand Up @@ -522,14 +530,25 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
self.database.insert_referenced_by(subject=ref.subject, version=ref.version, schema_id=schema_id)

def handle_msg(self, key: dict, value: dict | None) -> None:
if key["keytype"] == "CONFIG":
self._handle_msg_config(key, value)
elif key["keytype"] == "SCHEMA":
self._handle_msg_schema(key, value)
elif key["keytype"] == "DELETE_SUBJECT":
self._handle_msg_delete_subject(key, value)
elif key["keytype"] == "NOOP": # for spec completeness
pass
if "keytype" in key:
try:
message_type = MessageType(key["keytype"])

if message_type == MessageType.config:
self._handle_msg_config(key, value)
elif message_type == MessageType.schema:
self._handle_msg_schema(key, value)
elif message_type == MessageType.delete_subject:
self._handle_msg_delete_subject(key, value)
elif message_type == MessageType.no_operation:
pass
except ValueError:
LOG.error("The message %s-%s has been discarded because the %s is not managed", key, value, key["keytype"])

else:
LOG.error(
"The message %s-%s has been discarded because doesn't contain the `keytype` key in the key", key, value
)

def remove_referenced_by(
self,
Expand Down
54 changes: 27 additions & 27 deletions karapace/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
from google.protobuf.message import DecodeError
from jsonschema import ValidationError
from karapace.client import Client
from karapace.config import NameStrategy
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences
from karapace.kafka_rest_apis import SubjectType
from karapace.protobuf.exception import ProtobufTypeException
from karapace.protobuf.io import ProtobufDatumReader, ProtobufDatumWriter
from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema
Expand Down Expand Up @@ -72,9 +74,9 @@ def topic_record_name_strategy(topic_name: str, record_name: str) -> str:


NAME_STRATEGIES = {
"topic_name": topic_name_strategy,
"record_name": record_name_strategy,
"topic_record_name": topic_record_name_strategy,
NameStrategy.topic_name: topic_name_strategy,
NameStrategy.record_name: record_name_strategy,
NameStrategy.topic_record_name: topic_record_name_strategy,
}


Expand Down Expand Up @@ -103,7 +105,7 @@ async def post_new_schema(
raise SchemaRetrievalError(result.json())
return SchemaId(result.json()["id"])

async def _get_schema_r(
async def _get_schema_recursive(
self,
subject: Subject,
explored_schemas: Set[Tuple[Subject, Optional[ResolvedVersion]]],
Expand Down Expand Up @@ -131,7 +133,7 @@ async def _get_schema_r(
references = [Reference.from_dict(data) for data in json_result["references"]]
dependencies = {}
for reference in references:
_, schema, version = await self._get_schema_r(reference.subject, explored_schemas, reference.version)
_, schema, version = await self._get_schema_recursive(reference.subject, explored_schemas, reference.version)
dependencies[reference.name] = Dependency(
name=reference.name, subject=reference.subject, version=version, target_schema=schema
)
Expand Down Expand Up @@ -174,7 +176,7 @@ async def get_schema(
- ValidatedTypedSchema: The retrieved schema, validated and typed.
- ResolvedVersion: The version of the schema that was retrieved.
"""
return await self._get_schema_r(subject, set(), version)
return await self._get_schema_recursive(subject, set(), version)

async def get_schema_for_id(self, schema_id: SchemaId) -> Tuple[TypedSchema, List[Subject]]:
result = await self.client.get(f"schemas/ids/{schema_id}", params={"includeSubjects": "True"})
Expand Down Expand Up @@ -225,6 +227,25 @@ async def close(self):
await self.client.close()


def get_subject_name(
topic_name: str,
schema: TypedSchema,
subject_type: SubjectType,
naming_strategy: NameStrategy,
) -> Subject:
namespace = "dummy"
if schema.schema_type is SchemaType.AVRO:
if isinstance(schema.schema, avro.schema.NamedSchema):
namespace = schema.schema.namespace or ""
if schema.schema_type is SchemaType.JSONSCHEMA:
namespace = schema.to_dict().get("namespace", "dummy")
# Protobuf does not use namespaces in terms of AVRO
if schema.schema_type is SchemaType.PROTOBUF:
namespace = ""
naming_strategy = NAME_STRATEGIES[naming_strategy]
return Subject(f"{naming_strategy(topic_name, namespace)}-{subject_type}")


class SchemaRegistrySerializer:
def __init__(
self,
Expand All @@ -243,8 +264,6 @@ def __init__(
else:
registry_url = f"http://{self.config['registry_host']}:{self.config['registry_port']}"
registry_client = SchemaRegistryClient(registry_url, session_auth=session_auth)
name_strategy = config.get("name_strategy", "topic_name")
self.subject_name_strategy = NAME_STRATEGIES.get(name_strategy, topic_name_strategy)
self.registry_client: Optional[SchemaRegistryClient] = registry_client
self.ids_to_schemas: Dict[int, TypedSchema] = {}
self.ids_to_subjects: MutableMapping[int, List[Subject]] = TTLCache(maxsize=10000, ttl=600)
Expand All @@ -255,25 +274,6 @@ async def close(self) -> None:
await self.registry_client.close()
self.registry_client = None

def get_subject_name(
self,
topic_name: str,
schema: TypedSchema,
subject_type: str,
schema_type: SchemaType,
) -> Subject:
namespace = "dummy"
if schema_type is SchemaType.AVRO:
if isinstance(schema.schema, avro.schema.NamedSchema):
namespace = schema.schema.namespace
if schema_type is SchemaType.JSONSCHEMA:
namespace = schema.to_dict().get("namespace", "dummy")
# Protobuf does not use namespaces in terms of AVRO
if schema_type is SchemaType.PROTOBUF:
namespace = ""

return Subject(f"{self.subject_name_strategy(topic_name, namespace)}-{subject_type}")

async def get_schema_for_subject(self, subject: Subject) -> TypedSchema:
assert self.registry_client, "must not call this method after the object is closed."
schema_id, schema, _ = await self.registry_client.get_schema(subject)
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def fixture_session_logdir(request, tmp_path_factory, worker_id) -> Path:


@pytest.fixture(scope="session", name="default_config_path")
def fixture_default_config(session_logdir: Path) -> str:
def fixture_default_config(session_logdir: Path) -> Path:
path = session_logdir / "karapace_config.json"
content = json.dumps({"registry_host": "localhost", "registry_port": 8081}).encode()
content_len = len(content)
Expand All @@ -170,7 +170,7 @@ def fixture_default_config(session_logdir: Path) -> str:
raise OSError(f"Writing config failed, tried to write {content_len} bytes, but only {written} were written")
fp.flush()
os.fsync(fp)
return str(path)
return path


@pytest.fixture(name="tmp_file", scope="function")
Expand Down
Loading

0 comments on commit 3471482

Please sign in to comment.