diff --git a/karapace/schema_models.py b/karapace/schema_models.py index b2c7b6b94..a4222974e 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -395,11 +395,11 @@ class SchemaVersionManager: MINUS_1_SCHEMA_VERSION_TAG: Final = "-1" @classmethod - def latest_schema_tag_condition(cls, version: Version): + def latest_schema_tag_condition(cls, version: Version) -> bool: return (str(version) == cls.LATEST_SCHEMA_VERSION_TAG) or (str(version) == cls.MINUS_1_SCHEMA_VERSION_TAG) @classmethod - @catch_and_raise_error(to_catch=(ValueError,), to_raise=VersionNotFoundException) + @catch_and_raise_error(to_catch=(ValueError(),), to_raise=VersionNotFoundException()) def resolve_version( cls, schema_versions: Mapping[ResolvedVersion, SchemaVersion], @@ -409,11 +409,11 @@ def resolve_version( if cls.latest_schema_tag_condition(version): return max_version if (int(version) <= max_version) and (int(version) >= int(cls.MINUS_1_SCHEMA_VERSION_TAG)): - return ResolvedVersion(version) + return ResolvedVersion(int(version)) return None @classmethod - @catch_and_raise_error(to_catch=(ValueError,), to_raise=InvalidVersion) + @catch_and_raise_error(to_catch=(ValueError(),), to_raise=InvalidVersion()) def validate_version(cls, version: Version) -> Version | str | None: if cls.latest_schema_tag_condition(version): return cls.LATEST_SCHEMA_VERSION_TAG diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index aa6f1dabc..9521534e4 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -11,7 +11,6 @@ from karapace.dependency import Dependency from karapace.errors import ( IncompatibleSchema, - InvalidVersion, ReferenceExistsException, SchemasNotFoundException, SchemaVersionNotSoftDeletedException, @@ -26,11 +25,18 @@ from karapace.master_coordinator import MasterCoordinator from karapace.messaging import KarapaceProducer from karapace.offset_watcher import OffsetWatcher -from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema +from karapace.schema_models import ( + ParsedTypedSchema, + SchemaType, + SchemaVersion, + SchemaVersionManager, + TypedSchema, + ValidatedTypedSchema, +) from karapace.schema_reader import KafkaSchemaReader from karapace.schema_references import LatestVersionReference, Reference from karapace.typing import JsonObject, Mode, ResolvedVersion, SchemaId, Subject, Version -from typing import Mapping, Sequence +from typing import Sequence import asyncio import logging @@ -38,31 +44,6 @@ LOG = logging.getLogger(__name__) -def _resolve_version( - schema_versions: Mapping[ResolvedVersion, SchemaVersion], - version: Version, -) -> ResolvedVersion: - max_version = max(schema_versions) - if isinstance(version, str) and version == "latest": - return max_version - resolved_version = ResolvedVersion(int(version)) - if resolved_version <= max_version: - return resolved_version - raise VersionNotFoundException() - - -def validate_version(version: Version) -> Version: - try: - version_number = int(version) - if version_number > 0: - return version - raise InvalidVersion(f"Invalid version {version_number}") - except ValueError as ex: - if version == "latest": - return version - raise InvalidVersion(f"Invalid version {version}") from ex - - class KarapaceSchemaRegistry: def __init__(self, config: Config) -> None: self.config = config @@ -82,6 +63,7 @@ def __init__(self, config: Config) -> None: master_coordinator=self.mc, database=self.database, ) + self.schema_version_manager = SchemaVersionManager() self.schema_lock = asyncio.Lock() self._master_lock = asyncio.Lock() @@ -222,7 +204,7 @@ async def subject_version_delete_local(self, subject: Subject, version: Version, for version_id, schema_version in schema_versions.items() if schema_version.deleted is False } - resolved_version = _resolve_version(schema_versions=schema_versions, version=version) + resolved_version = self.schema_version_manager.resolve_version(schema_versions=schema_versions, version=version) schema_version = schema_versions.get(resolved_version, None) if not schema_version: @@ -261,11 +243,11 @@ def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[R return schemas def subject_version_get(self, subject: Subject, version: Version, *, include_deleted: bool = False) -> JsonObject: - validate_version(version) + self.schema_version_manager.validate_version(version) schema_versions = self.subject_get(subject, include_deleted=include_deleted) if not schema_versions: raise SubjectNotFoundException() - resolved_version = _resolve_version(schema_versions=schema_versions, version=version) + resolved_version = self.schema_version_manager.resolve_version(schema_versions=schema_versions, version=version) schema_data: SchemaVersion | None = schema_versions.get(resolved_version, None) if not schema_data: @@ -293,11 +275,11 @@ def subject_version_get(self, subject: Subject, version: Version, *, include_del async def subject_version_referencedby_get( self, subject: Subject, version: Version, *, include_deleted: bool = False ) -> list: - validate_version(version) + self.schema_version_manager.validate_version(version) schema_versions = self.subject_get(subject, include_deleted=include_deleted) if not schema_versions: raise SubjectNotFoundException() - resolved_version = _resolve_version(schema_versions=schema_versions, version=version) + resolved_version = self.schema_version_manager.resolve_version(schema_versions=schema_versions, version=version) schema_data: SchemaVersion | None = schema_versions.get(resolved_version, None) if not schema_data: raise VersionNotFoundException() diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 0216b5e5e..d972dd2b6 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -31,9 +31,16 @@ from karapace.karapace import KarapaceBase from karapace.protobuf.exception import ProtobufUnresolvedDependencyException from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME -from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema +from karapace.schema_models import ( + ParsedTypedSchema, + SchemaType, + SchemaVersion, + SchemaVersionManager, + TypedSchema, + ValidatedTypedSchema, +) from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping -from karapace.schema_registry import KarapaceSchemaRegistry, validate_version +from karapace.schema_registry import KarapaceSchemaRegistry from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, Subject from karapace.utils import JSONDecodeError from typing import Any @@ -814,7 +821,7 @@ async def subject_version_delete( self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None ) -> None: self._check_authorization(user, Operation.Write, f"Subject:{subject}") - version = validate_version(version) + version = SchemaVersionManager.validate_version(version) permanent = request.query.get("permanent", "false").lower() == "true" are_we_master, master_url = await self.schema_registry.get_master()