-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add configurable validation strategy by topic #745
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -74,7 +74,7 @@ class Config(TypedDict): | |||||
session_timeout_ms: int | ||||||
karapace_rest: bool | ||||||
karapace_registry: bool | ||||||
name_strategy: str | ||||||
default_name_strategy: str | ||||||
name_strategy_validation: bool | ||||||
master_election_strategy: str | ||||||
protobuf_runtime_directory: str | ||||||
|
@@ -146,7 +146,7 @@ class ConfigDefaults(Config, total=False): | |||||
"session_timeout_ms": 10000, | ||||||
"karapace_rest": False, | ||||||
"karapace_registry": False, | ||||||
"name_strategy": "topic_name", | ||||||
"default_name_strategy": "topic_name", | ||||||
"name_strategy_validation": True, | ||||||
"master_election_strategy": "lowest", | ||||||
"protobuf_runtime_directory": "runtime", | ||||||
|
@@ -158,17 +158,30 @@ class InvalidConfiguration(Exception): | |||||
pass | ||||||
|
||||||
|
||||||
class StrEnum(str, Enum): | ||||||
def __str__(self) -> str: | ||||||
return str(self.value) | ||||||
|
||||||
|
||||||
@unique | ||||||
class ElectionStrategy(Enum): | ||||||
highest = "highest" | ||||||
lowest = "lowest" | ||||||
|
||||||
|
||||||
@unique | ||||||
class NameStrategy(Enum): | ||||||
class NameStrategy(StrEnum): | ||||||
topic_name = "topic_name" | ||||||
record_name = "record_name" | ||||||
topic_record_name = "topic_record_name" | ||||||
no_validation = "no_validation_strategy" | ||||||
|
||||||
|
||||||
@unique | ||||||
class SubjectType(StrEnum): | ||||||
key = "key" | ||||||
value = "value" | ||||||
partition = "partition" | ||||||
|
||||||
|
||||||
def parse_env_value(value: str) -> str | int | bool: | ||||||
|
@@ -269,12 +282,14 @@ def validate_config(config: Config) -> None: | |||||
f"Invalid master election strategy: {master_election_strategy}, valid values are {valid_strategies}" | ||||||
) from None | ||||||
|
||||||
name_strategy = config["name_strategy"] | ||||||
deafault_name_strategy = config["default_name_strategy"] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah you are right, I've copy-pasted the same in the following pr |
||||||
try: | ||||||
NameStrategy(name_strategy) | ||||||
NameStrategy(deafault_name_strategy) | ||||||
except ValueError: | ||||||
valid_strategies = [strategy.value for strategy in NameStrategy] | ||||||
raise InvalidConfiguration(f"Invalid name strategy: {name_strategy}, valid values are {valid_strategies}") from None | ||||||
valid_strategies = list(NameStrategy) | ||||||
raise InvalidConfiguration( | ||||||
f"Invalid default name strategy: {deafault_name_strategy}, valid values are {valid_strategies}" | ||||||
) from None | ||||||
|
||||||
if config["rest_authorization"] and config["sasl_bootstrap_uri"] is None: | ||||||
raise InvalidConfiguration( | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,9 +7,10 @@ | |
from __future__ import annotations | ||
|
||
from dataclasses import dataclass, field | ||
from karapace.config import NameStrategy | ||
from karapace.schema_models import SchemaVersion, TypedSchema | ||
from karapace.schema_references import Reference, Referents | ||
from karapace.typing import ResolvedVersion, SchemaId, Subject | ||
from karapace.typing import ResolvedVersion, SchemaId, Subject, TopicName | ||
from threading import Lock, RLock | ||
from typing import Iterable, Sequence | ||
|
||
|
@@ -32,6 +33,7 @@ def __init__(self) -> None: | |
self.schemas: dict[SchemaId, TypedSchema] = {} | ||
self.schema_lock_thread = RLock() | ||
self.referenced_by: dict[tuple[Subject, ResolvedVersion], Referents] = {} | ||
self.topic_validation_strategies: dict[TopicName, NameStrategy] = {} | ||
|
||
# Content based deduplication of schemas. This is used to reduce memory | ||
# usage when the same schema is produce multiple times to the same or | ||
|
@@ -229,6 +231,15 @@ def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> di | |
if schema_version.deleted is False | ||
} | ||
|
||
def get_topic_strategy(self, *, topic_name: TopicName) -> NameStrategy | None: | ||
if topic_name not in self.topic_validation_strategies: | ||
return None | ||
|
||
return self.topic_validation_strategies[topic_name] | ||
Comment on lines
+235
to
+238
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about self.topic_validation_strategies.get(topic_name) ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer an exception instead of a |
||
|
||
def override_topic_strategy(self, *, topic_name: TopicName, name_strategy: NameStrategy) -> None: | ||
self.topic_validation_strategies[topic_name] = name_strategy | ||
|
||
def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None: | ||
with self.schema_lock_thread: | ||
for schema_version in self.subjects[subject].schemas.values(): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,7 @@ | |
TopicAuthorizationFailedError, | ||
UnknownTopicOrPartitionError, | ||
) | ||
from karapace.config import Config, create_client_ssl_context | ||
from karapace.config import Config, create_client_ssl_context, NameStrategy, SubjectType | ||
from karapace.errors import InvalidSchema | ||
from karapace.kafka_rest_apis.admin import KafkaRestAdminClient | ||
from karapace.kafka_rest_apis.authentication import ( | ||
|
@@ -28,8 +28,14 @@ | |
from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE | ||
from karapace.schema_models import TypedSchema, ValidatedTypedSchema | ||
from karapace.schema_type import SchemaType | ||
from karapace.serialization import InvalidMessageSchema, InvalidPayload, SchemaRegistrySerializer, SchemaRetrievalError | ||
from karapace.typing import SchemaId, Subject | ||
from karapace.serialization import ( | ||
get_subject_name, | ||
InvalidMessageSchema, | ||
InvalidPayload, | ||
SchemaRegistrySerializer, | ||
SchemaRetrievalError, | ||
) | ||
from karapace.typing import SchemaId, Subject, TopicName | ||
from karapace.utils import convert_to_int, json_encode, KarapaceKafkaClient | ||
from typing import Callable, Dict, List, Optional, Tuple, Union | ||
|
||
|
@@ -39,7 +45,7 @@ | |
import logging | ||
import time | ||
|
||
RECORD_KEYS = ["key", "value", "partition"] | ||
SUBJECT_VALID_POSTFIX = [SubjectType.key, SubjectType.value] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we getting rid of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, if you look previously there was a tricky side effect of using the |
||
PUBLISH_KEYS = {"records", "value_schema", "value_schema_id", "key_schema", "key_schema_id"} | ||
RECORD_CODES = [42201, 42202] | ||
KNOWN_FORMATS = {"json", "avro", "protobuf", "binary"} | ||
|
@@ -759,7 +765,7 @@ async def get_schema_id( | |
self, | ||
data: dict, | ||
topic: str, | ||
prefix: str, | ||
subject_type: SubjectType, | ||
schema_type: SchemaType, | ||
) -> SchemaId: | ||
""" | ||
|
@@ -770,29 +776,35 @@ async def get_schema_id( | |
""" | ||
log.debug("[resolve schema id] Retrieving schema id for %r", data) | ||
schema_id: Union[SchemaId, None] = ( | ||
SchemaId(int(data[f"{prefix}_schema_id"])) if f"{prefix}_schema_id" in data else None | ||
SchemaId(int(data[f"{subject_type}_schema_id"])) if f"{subject_type}_schema_id" in data else None | ||
) | ||
schema_str = data.get(f"{prefix}_schema") | ||
schema_str = data.get(f"{subject_type}_schema") | ||
naming_strategy = await self.serializer.get_topic_strategy_name(topic_name=TopicName(topic)) | ||
|
||
if schema_id is None and schema_str is None: | ||
raise InvalidSchema() | ||
|
||
if schema_id is None: | ||
parsed_schema = ValidatedTypedSchema.parse(schema_type, schema_str) | ||
subject_name = self.serializer.get_subject_name(topic, parsed_schema, prefix, schema_type) | ||
|
||
subject_name = get_subject_name(topic, parsed_schema, subject_type, naming_strategy) | ||
schema_id = await self._query_schema_id_from_cache_or_registry(parsed_schema, schema_str, subject_name) | ||
else: | ||
|
||
def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool: | ||
subject = self.serializer.get_subject_name(topic, schema, prefix, schema_type) | ||
subject = get_subject_name(topic, schema, subject_type, naming_strategy) | ||
return subject not in subjects | ||
|
||
parsed_schema, valid_subjects = await self._query_schema_and_subjects( | ||
schema_id, | ||
need_new_call=subject_not_included, | ||
) | ||
|
||
if self.config["name_strategy_validation"] and subject_not_included(parsed_schema, valid_subjects): | ||
if ( | ||
self.config["name_strategy_validation"] | ||
and naming_strategy != NameStrategy.no_validation | ||
and subject_not_included(parsed_schema, valid_subjects) | ||
): | ||
raise InvalidSchema() | ||
|
||
return schema_id | ||
|
@@ -833,7 +845,9 @@ async def _query_schema_id_from_cache_or_registry( | |
) | ||
return schema_id | ||
|
||
async def validate_schema_info(self, data: dict, prefix: str, content_type: str, topic: str, schema_type: str): | ||
async def validate_schema_info( | ||
self, data: dict, subject_type: SubjectType, content_type: str, topic: str, schema_type: str | ||
): | ||
try: | ||
schema_type = SCHEMA_MAPPINGS[schema_type] | ||
except KeyError: | ||
|
@@ -848,7 +862,7 @@ async def validate_schema_info(self, data: dict, prefix: str, content_type: str, | |
|
||
# will do in place updates of id keys, since calling these twice would be expensive | ||
try: | ||
data[f"{prefix}_schema_id"] = await self.get_schema_id(data, topic, prefix, schema_type) | ||
data[f"{subject_type}_schema_id"] = await self.get_schema_id(data, topic, subject_type, schema_type) | ||
except InvalidPayload: | ||
log.exception("Unable to retrieve schema id") | ||
KafkaRest.r( | ||
|
@@ -863,16 +877,17 @@ async def validate_schema_info(self, data: dict, prefix: str, content_type: str, | |
KafkaRest.r( | ||
body={ | ||
"error_code": RESTErrorCodes.SCHEMA_RETRIEVAL_ERROR.value, | ||
"message": f"Error when registering schema. format = {schema_type.value}, subject = {topic}-{prefix}", | ||
"message": f"Error when registering schema." | ||
f"format = {schema_type.value}, subject = {topic}-{subject_type}", | ||
}, | ||
content_type=content_type, | ||
status=HTTPStatus.REQUEST_TIMEOUT, | ||
) | ||
except InvalidSchema: | ||
if f"{prefix}_schema" in data: | ||
err = f'schema = {data[f"{prefix}_schema"]}' | ||
if f"{subject_type}_schema" in data: | ||
err = f'schema = {data[f"{subject_type}_schema"]}' | ||
else: | ||
err = f'schema_id = {data[f"{prefix}_schema_id"]}' | ||
err = f'schema_id = {data[f"{subject_type}_schema_id"]}' | ||
KafkaRest.r( | ||
body={ | ||
"error_code": RESTErrorCodes.INVALID_DATA.value, | ||
|
@@ -1002,26 +1017,26 @@ async def validate_publish_request_format(self, data: dict, formats: dict, conte | |
status=HTTPStatus.BAD_REQUEST, | ||
) | ||
convert_to_int(r, "partition", content_type) | ||
if set(r.keys()).difference(RECORD_KEYS): | ||
if set(r.keys()).difference({subject_type.value for subject_type in SubjectType}): | ||
KafkaRest.unprocessable_entity( | ||
message="Invalid request format", | ||
content_type=content_type, | ||
sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, | ||
) | ||
# disallow missing id and schema for any key/value list that has at least one populated element | ||
if formats["embedded_format"] in {"avro", "jsonschema", "protobuf"}: | ||
for prefix, code in zip(RECORD_KEYS, RECORD_CODES): | ||
if self.all_empty(data, prefix): | ||
for subject_type, code in zip(SUBJECT_VALID_POSTFIX, RECORD_CODES): | ||
if self.all_empty(data, subject_type): | ||
continue | ||
if not self.is_valid_schema_request(data, prefix): | ||
if not self.is_valid_schema_request(data, subject_type): | ||
KafkaRest.unprocessable_entity( | ||
message=f"Request includes {prefix}s and uses a format that requires schemas " | ||
f"but does not include the {prefix}_schema or {prefix}_schema_id fields", | ||
message=f"Request includes {subject_type}s and uses a format that requires schemas " | ||
f"but does not include the {subject_type}_schema or {subject_type.value}_schema_id fields", | ||
content_type=content_type, | ||
sub_code=code, | ||
) | ||
try: | ||
await self.validate_schema_info(data, prefix, content_type, topic, formats["embedded_format"]) | ||
await self.validate_schema_info(data, subject_type, content_type, topic, formats["embedded_format"]) | ||
except InvalidMessageSchema as e: | ||
KafkaRest.unprocessable_entity( | ||
message=str(e), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change would require a major version upgrade
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, I've decided to get rid of this change. This pr needs to be rebased on top of the #754. Probably I will close this pr since the design of making stateful the request for publish using the rest endpoint isn't a great idea.
It's probably mark this pr as draft