From 3eeddf99e87d4ecb1bc347fe503db0757158cabb Mon Sep 17 00:00:00 2001 From: libretto Date: Wed, 8 May 2024 14:57:32 +0300 Subject: [PATCH 1/3] AVRO references support --- karapace/schema_models.py | 14 +++- karapace/schema_reader.py | 14 +++- karapace/schema_registry_apis.py | 2 +- .../test_schema_avro_references.py | 64 +++++++++++++++++++ 4 files changed, 91 insertions(+), 3 deletions(-) create mode 100644 tests/integration/test_schema_avro_references.py diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 46e3832d5..01c95dca3 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -138,6 +138,7 @@ def normalize_schema_str( except JSONDecodeError as e: LOG.info("Schema is not valid JSON") raise e + elif schema_type == SchemaType.PROTOBUF: if schema: schema_str = str(schema) @@ -180,6 +181,17 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema: return parsed_typed_schema.schema +def avro_schema_merge(schema_str: str, dependencies: Mapping[str, Dependency]) -> str: + """To support references in AVRO we recursively merge all referenced schemas with current schema""" + if dependencies: + merged_schema = "" + for dependency in dependencies.values(): + merged_schema += avro_schema_merge(dependency.schema.schema_str, dependency.schema.dependencies) + ",\n" + merged_schema += schema_str + return "[\n" + merged_schema + "\n]" + return schema_str + + def parse( schema_type: SchemaType, schema_str: str, @@ -196,7 +208,7 @@ def parse( if schema_type is SchemaType.AVRO: try: parsed_schema = parse_avro_schema_definition( - schema_str, + avro_schema_merge(schema_str, dependencies), validate_enum_symbols=validate_avro_enum_symbols, validate_names=validate_avro_names, ) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 70aaa3a77..455f7eac3 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -501,7 +501,19 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema | None = None resolved_dependencies: dict[str, Dependency] | None = None - if schema_type_parsed in [SchemaType.AVRO, SchemaType.JSONSCHEMA]: + if schema_type_parsed == SchemaType.AVRO: + try: + if schema_references: + candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references] + resolved_references, resolved_dependencies = self.resolve_references(candidate_references) + schema_str = json.dumps(json.loads(schema_str), sort_keys=True) + except json.JSONDecodeError: + LOG.warning("Schema is not valid JSON") + return + except InvalidReferences: + LOG.exception("Invalid AVRO references") + return + elif schema_type_parsed == SchemaType.JSONSCHEMA: try: schema_str = json.dumps(json.loads(schema_str), sort_keys=True) except json.JSONDecodeError: diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 0216b5e5e..65c817e11 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -1043,7 +1043,7 @@ def _validate_references( content_type=content_type, status=HTTPStatus.BAD_REQUEST, ) - if references and schema_type != SchemaType.PROTOBUF: + if references and schema_type != SchemaType.PROTOBUF and schema_type != SchemaType.AVRO: self.r( body={ "error_code": SchemaErrorCodes.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value, diff --git a/tests/integration/test_schema_avro_references.py b/tests/integration/test_schema_avro_references.py new file mode 100644 index 000000000..5c68c14f5 --- /dev/null +++ b/tests/integration/test_schema_avro_references.py @@ -0,0 +1,64 @@ +""" +karapace - schema tests + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +import json + +from karapace.client import Client + +baseurl = "http://localhost:8081" + + +async def test_avro_references(registry_async_client: Client) -> None: + schema_country = { + "type": "record", + "name": "Country", + "namespace": "com.netapp", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "code", "type": "string"} + ] + } + + schema_address = { + "type": "record", + "name": "Address", + "namespace": "com.netapp", + "fields": [ + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "postalCode", "type": "string"}, + {"name": "country", "type": "Country"} + ] + + } + + res = await registry_async_client.post( + f"subjects/country/versions", json={"schema": json.dumps(schema_country)} + ) + assert res.status_code == 200 + assert "id" in res.json() + country_references = [{"name": "country.proto", "subject": "country", "version": 1}] + + res = await registry_async_client.post( + "subjects/address/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_address), "references": country_references}, + ) + assert res.status_code == 200 + assert "id" in res.json() + address_id = res.json()["id"] + + # Check if the schema has now been registered under the subject + + res = await registry_async_client.post( + "subjects/address", + json={"schemaType": "AVRO", "schema": json.dumps(schema_address), "references": country_references}, + ) + assert res.status_code == 200 + assert "subject" in res.json() + assert "id" in res.json() + assert address_id == res.json()["id"] + assert "version" in res.json() + assert "schema" in res.json() From 6515f2c83bb7f85d66afd61b1d6e915be3c6d939 Mon Sep 17 00:00:00 2001 From: libretto Date: Wed, 8 May 2024 15:10:29 +0300 Subject: [PATCH 2/3] pylint fixes --- .../integration/test_schema_avro_references.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/tests/integration/test_schema_avro_references.py b/tests/integration/test_schema_avro_references.py index 5c68c14f5..ecfafe999 100644 --- a/tests/integration/test_schema_avro_references.py +++ b/tests/integration/test_schema_avro_references.py @@ -4,10 +4,10 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -import json - from karapace.client import Client +import json + baseurl = "http://localhost:8081" @@ -16,10 +16,7 @@ async def test_avro_references(registry_async_client: Client) -> None: "type": "record", "name": "Country", "namespace": "com.netapp", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "code", "type": "string"} - ] + "fields": [{"name": "name", "type": "string"}, {"name": "code", "type": "string"}], } schema_address = { @@ -30,14 +27,11 @@ async def test_avro_references(registry_async_client: Client) -> None: {"name": "street", "type": "string"}, {"name": "city", "type": "string"}, {"name": "postalCode", "type": "string"}, - {"name": "country", "type": "Country"} - ] - + {"name": "country", "type": "Country"}, + ], } - res = await registry_async_client.post( - f"subjects/country/versions", json={"schema": json.dumps(schema_country)} - ) + res = await registry_async_client.post("subjects/country/versions", json={"schema": json.dumps(schema_country)}) assert res.status_code == 200 assert "id" in res.json() country_references = [{"name": "country.proto", "subject": "country", "version": 1}] From c250295e5186fbddefddda2a699acd2241627672 Mon Sep 17 00:00:00 2001 From: libretto Date: Wed, 8 May 2024 15:30:34 +0300 Subject: [PATCH 3/3] remove pylint issues with too many returns --- karapace/schema_reader.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 40a05d8fa..27e7095f2 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -500,9 +500,9 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: try: schema_type_parsed = SchemaType(schema_type) - except ValueError: + except ValueError as e: LOG.warning("Invalid schema type: %s", schema_type) - return + raise e # This does two jobs: # - Validates the schema's JSON @@ -519,18 +519,18 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references] resolved_references, resolved_dependencies = self.resolve_references(candidate_references) schema_str = json.dumps(json.loads(schema_str), sort_keys=True) - except json.JSONDecodeError: + except json.JSONDecodeError as e: LOG.warning("Schema is not valid JSON") - return - except InvalidReferences: + raise e + except InvalidReferences as e: LOG.exception("Invalid AVRO references") - return + raise e elif schema_type_parsed == SchemaType.JSONSCHEMA: try: schema_str = json.dumps(json.loads(schema_str), sort_keys=True) - except json.JSONDecodeError: + except json.JSONDecodeError as e: LOG.warning("Schema is not valid JSON") - return + raise e elif schema_type_parsed == SchemaType.PROTOBUF: try: if schema_references: @@ -544,12 +544,12 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: normalize=False, ) schema_str = str(parsed_schema) - except InvalidSchema: + except InvalidSchema as e: LOG.exception("Schema is not valid ProtoBuf definition") - return - except InvalidReferences: + raise e + except InvalidReferences as e: LOG.exception("Invalid Protobuf references") - return + raise e try: typed_schema = TypedSchema( @@ -559,8 +559,8 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: dependencies=resolved_dependencies, schema=parsed_schema, ) - except (InvalidSchema, JSONDecodeError): - return + except (InvalidSchema, JSONDecodeError) as e: + raise e self.database.insert_schema_version( subject=schema_subject,