From 6b3392c4ac6311041c3f30dc53485348377730fe Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Tue, 5 Nov 2024 14:14:29 +0000 Subject: [PATCH] Better Avro schema reference handling. fixes: #406 - `AvroReferenceFinder` added to recursively look for Avro schema references. - `SchemaReader` now returns an ordered list of schema that need to be registered, in the order they need to be registered - `SchemaChangeSetCalculators` now ignores the version number when seeing if schema have changed (fixing a bug where it always thought the schema had changed, due to different version numbers). - `SchemaChangeSetCalculators` now marks any schema outside the domain as `IGNORED` - `SchemaMutators` includes `IGNORED` schemas, so that users can see that common schemas are not registered. --- .../cli/ProvisionNestedFunctionalTest.java | 58 ++- .../cli/StorageConsumptionFunctionalTest.java | 2 +- .../{_public => }/UserSignedUp.java | 2 +- .../resources/nested_schema_demo-api.yaml | 29 +- .../resources/schema/other.domain.Common.avsc | 8 + .../simple.schema_demo.SuperUserSignedUp.avsc | 10 + ...c => simple.schema_demo.UserSignedUp.avsc} | 2 +- ...schema_demo._public.SuperUserSignedUp.avsc | 9 - .../resources/simple_schema_demo-api.yaml | 2 +- .../test/resources/simple_spec_demo-api.yaml | 2 +- .../kafka/ClientsFunctionalDemoTest.java | 2 +- .../simple/schema_demo}/UserSignedUp.java | 2 +- ...le.schema_demo._public.user_signed_up.avsc | 2 +- ....schema_demo._public.user_signed_up_2.avsc | 2 +- .../provision/schema/AvroReferenceFinder.java | 388 ++++++++++++++++ .../schema/SchemaChangeSetCalculators.java | 166 ++++--- .../provision/schema/SchemaMutators.java | 37 +- .../provision/schema/SchemaProvisioner.java | 149 ++++--- .../kafka/provision/schema/SchemaReaders.java | 170 +++---- .../ProvisionerFreshStartFunctionalTest.java | 4 +- .../kafka/admin/SimpleAdminClientTest.java | 27 +- .../SchemaProvisionerFunctionalTest.java | 9 +- .../SchemaProvisionerReferenceTest.java | 131 +++--- .../schema/AvroReferenceFinderTest.java | 414 ++++++++++++++++++ .../SchemaChangeSetCalculatorsTest.java | 161 ++++++- .../kafka/schema/SrSchemaManagerTest.java | 2 +- .../simple/schema_demo}/UserSignedUp.java | 2 +- .../com.example.single-spec-with-refs-api.yml | 58 --- .../schema/com.example.trading.Trade.avsc | 5 + .../schema/com.example.trading.TradeInfo.avsc | 8 + .../resources/schema/other.domain.Common.avsc | 8 + ...vision_demo._public.user_signed_up-v2.avsc | 2 +- ...on_demo._public.user_signed_up-v3-bad.avsc | 2 +- ...provision_demo._public.user_signed_up.avsc | 2 +- ...ision_demo._public.user_signed_up.key.avsc | 2 +- ...le.schema_demo._public.user_signed_up.avsc | 2 +- 36 files changed, 1408 insertions(+), 473 deletions(-) rename cli/src/test/java/simple/schema_demo/{_public => }/UserSignedUp.java (95%) create mode 100644 cli/src/test/resources/schema/other.domain.Common.avsc create mode 100644 cli/src/test/resources/schema/simple.schema_demo.SuperUserSignedUp.avsc rename cli/src/test/resources/schema/{simple.schema_demo._public.UserSignedUp.avsc => simple.schema_demo.UserSignedUp.avsc} (80%) delete mode 100644 cli/src/test/resources/schema/simple.schema_demo._public.SuperUserSignedUp.avsc rename {kafka/src/test/java/simple/schema_demo/_public/user_signed_up_value => kafka-test/src/test/java/simple/schema_demo}/UserSignedUp.java (93%) create mode 100644 kafka/src/main/java/io/specmesh/kafka/provision/schema/AvroReferenceFinder.java create mode 100644 kafka/src/test/java/io/specmesh/kafka/provision/schema/AvroReferenceFinderTest.java rename {kafka-test/src/test/java/simple/schema_demo/_public/user_signed_up_value => kafka/src/test/java/simple/schema_demo}/UserSignedUp.java (93%) delete mode 100644 kafka/src/test/resources/schema-ref/com.example.single-spec-with-refs-api.yml create mode 100644 kafka/src/test/resources/schema-ref/schema/com.example.trading.TradeInfo.avsc create mode 100644 kafka/src/test/resources/schema/other.domain.Common.avsc diff --git a/cli/src/test/java/io/specmesh/cli/ProvisionNestedFunctionalTest.java b/cli/src/test/java/io/specmesh/cli/ProvisionNestedFunctionalTest.java index 6842d221..fe6d1fd0 100644 --- a/cli/src/test/java/io/specmesh/cli/ProvisionNestedFunctionalTest.java +++ b/cli/src/test/java/io/specmesh/cli/ProvisionNestedFunctionalTest.java @@ -17,14 +17,23 @@ package io.specmesh.cli; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.specmesh.kafka.DockerKafkaEnvironment; import io.specmesh.kafka.KafkaEnvironment; +import io.specmesh.kafka.provision.Status; import io.specmesh.kafka.provision.TopicProvisioner.Topic; +import io.specmesh.kafka.provision.schema.SchemaProvisioner; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -45,6 +54,8 @@ class ProvisionNestedFunctionalTest { @Test void shouldProvisionTopicsAndAclResourcesWithNestedSchemasAndRepublishCorrectly() { // Given: + givenCommonSchemaRegistered(); + final Provision provision = new Provision(); new CommandLine(provision) @@ -67,13 +78,25 @@ void shouldProvisionTopicsAndAclResourcesWithNestedSchemasAndRepublishCorrectly( assertThat( status.topics().stream().map(Topic::name).collect(Collectors.toSet()), - is( - containsInAnyOrder( - "simple.schema_demo._public.super_user_signed_up", - "simple.schema_demo._public.user_signed_up"))); + is(contains("simple.schema_demo._public.super_user_signed_up"))); + + assertThat( + status.schemas().stream() + .filter(s -> s.state() == Status.STATE.CREATED) + .map(SchemaProvisioner.Schema::subject) + .collect(Collectors.toList()), + containsInAnyOrder( + "simple.schema_demo._public.super_user_signed_up-value", + "simple.schema_demo._public.UserSignedUp")); + + assertThat( + status.schemas().stream() + .filter(s -> s.state() == Status.STATE.IGNORED) + .map(SchemaProvisioner.Schema::subject) + .collect(Collectors.toList()), + contains("other.domain.Common.subject")); assertThat(status.acls(), hasSize(10)); - assertThat(status.schemas(), hasSize(2)); // When: final var statusRepublish = provision.run(); @@ -82,10 +105,25 @@ void shouldProvisionTopicsAndAclResourcesWithNestedSchemasAndRepublishCorrectly( assertThat(statusRepublish.failed(), is(false)); assertThat(statusRepublish.topics(), is(empty())); assertThat(statusRepublish.acls(), is(empty())); - assertThat( - "should be empty, but is 1 due to version number difference when comparing" - + " references", - statusRepublish.schemas(), - hasSize(1)); + + final List schemas = + statusRepublish.schemas().stream() + .filter(s -> s.state() != Status.STATE.IGNORED) + .collect(Collectors.toList()); + + assertThat(schemas, is(empty())); + } + + private void givenCommonSchemaRegistered() { + try (SchemaRegistryClient srClient = KAFKA_ENV.srClient()) { + final ParsedSchema schema = + new AvroSchema( + Files.readString( + Path.of( + "./src/test/resources/schema/other.domain.Common.avsc"))); + srClient.register("other.domain.Common.subject", schema); + } catch (Exception e) { + throw new AssertionError("failed to register common schema", e); + } } } diff --git a/cli/src/test/java/io/specmesh/cli/StorageConsumptionFunctionalTest.java b/cli/src/test/java/io/specmesh/cli/StorageConsumptionFunctionalTest.java index d752036a..a23854be 100644 --- a/cli/src/test/java/io/specmesh/cli/StorageConsumptionFunctionalTest.java +++ b/cli/src/test/java/io/specmesh/cli/StorageConsumptionFunctionalTest.java @@ -66,7 +66,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import picocli.CommandLine; -import simple.schema_demo._public.UserSignedUp; +import simple.schema_demo.UserSignedUp; @SuppressFBWarnings({"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"}) class StorageConsumptionFunctionalTest { diff --git a/cli/src/test/java/simple/schema_demo/_public/UserSignedUp.java b/cli/src/test/java/simple/schema_demo/UserSignedUp.java similarity index 95% rename from cli/src/test/java/simple/schema_demo/_public/UserSignedUp.java rename to cli/src/test/java/simple/schema_demo/UserSignedUp.java index e552065a..8043a295 100644 --- a/cli/src/test/java/simple/schema_demo/_public/UserSignedUp.java +++ b/cli/src/test/java/simple/schema_demo/UserSignedUp.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package simple.schema_demo._public; +package simple.schema_demo; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/cli/src/test/resources/nested_schema_demo-api.yaml b/cli/src/test/resources/nested_schema_demo-api.yaml index 89657068..e3ce60bc 100644 --- a/cli/src/test/resources/nested_schema_demo-api.yaml +++ b/cli/src/test/resources/nested_schema_demo-api.yaml @@ -14,31 +14,6 @@ servers: url: mqtt://test.mosquitto.org protocol: mqtt channels: - _public.user_signed_up: - bindings: - kafka: - envs: - - staging - - prod - partitions: 3 - replicas: 1 - configs: - cleanup.policy: delete - - publish: - operationId: onLightMeasured - message: - bindings: - kafka: - key: - type: long - schemaIdLocation: "payload" - schemaLookupStrategy: "RecordNameStrategy" - schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" - contentType: "application/octet-stream" - payload: - $ref: "/schema/simple.schema_demo._public.UserSignedUp.avsc" - _public.super_user_signed_up: # publish bindings to instruct topic configuration per environment bindings: @@ -58,9 +33,7 @@ channels: kafka: key: type: long - schemaIdLocation: "payload" - schemaLookupStrategy: "RecordNameStrategy" schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" payload: - $ref: "/schema/simple.schema_demo._public.SuperUserSignedUp.avsc" + $ref: "/schema/simple.schema_demo.SuperUserSignedUp.avsc" diff --git a/cli/src/test/resources/schema/other.domain.Common.avsc b/cli/src/test/resources/schema/other.domain.Common.avsc new file mode 100644 index 00000000..35ef04ff --- /dev/null +++ b/cli/src/test/resources/schema/other.domain.Common.avsc @@ -0,0 +1,8 @@ +{ + "type": "record", + "namespace": "other.domain", + "name": "Common", + "fields": [ + {"name": "thing", "type": "string"} + ] +} \ No newline at end of file diff --git a/cli/src/test/resources/schema/simple.schema_demo.SuperUserSignedUp.avsc b/cli/src/test/resources/schema/simple.schema_demo.SuperUserSignedUp.avsc new file mode 100644 index 00000000..e6488708 --- /dev/null +++ b/cli/src/test/resources/schema/simple.schema_demo.SuperUserSignedUp.avsc @@ -0,0 +1,10 @@ +{ + "type": "record", + "namespace": "simple.schema_demo", + "name": "SuperUserSignedUp", + "fields": [ + {"name": "role", "type": "string"}, + {"name": "user", "type": "simple.schema_demo.UserSignedUp", "subject": "simple.schema_demo._public.UserSignedUp"}, + {"name": "common", "type": "other.domain.Common", "subject": "other.domain.Common.subject"} + ] +} \ No newline at end of file diff --git a/cli/src/test/resources/schema/simple.schema_demo._public.UserSignedUp.avsc b/cli/src/test/resources/schema/simple.schema_demo.UserSignedUp.avsc similarity index 80% rename from cli/src/test/resources/schema/simple.schema_demo._public.UserSignedUp.avsc rename to cli/src/test/resources/schema/simple.schema_demo.UserSignedUp.avsc index 047ebd07..9c3420c0 100644 --- a/cli/src/test/resources/schema/simple.schema_demo._public.UserSignedUp.avsc +++ b/cli/src/test/resources/schema/simple.schema_demo.UserSignedUp.avsc @@ -1,6 +1,6 @@ { "type": "record", - "namespace": "simple.schema_demo._public", + "namespace": "simple.schema_demo", "name": "UserSignedUp", "fields": [ {"name": "fullName", "type": "string"}, diff --git a/cli/src/test/resources/schema/simple.schema_demo._public.SuperUserSignedUp.avsc b/cli/src/test/resources/schema/simple.schema_demo._public.SuperUserSignedUp.avsc deleted file mode 100644 index 5c458f4c..00000000 --- a/cli/src/test/resources/schema/simple.schema_demo._public.SuperUserSignedUp.avsc +++ /dev/null @@ -1,9 +0,0 @@ -{ - "type": "record", - "namespace": "simple.schema_demo._public", - "name": "SuperUserSignedUp", - "fields": [ - {"name": "role", "type": "string"}, - {"name": "user", "type": "simple.schema_demo._public.UserSignedUp", "subject": "simple.schema_demo._public.UserSignedUp"} - ] -} \ No newline at end of file diff --git a/cli/src/test/resources/simple_schema_demo-api.yaml b/cli/src/test/resources/simple_schema_demo-api.yaml index 11a09f56..322ff782 100644 --- a/cli/src/test/resources/simple_schema_demo-api.yaml +++ b/cli/src/test/resources/simple_schema_demo-api.yaml @@ -39,7 +39,7 @@ channels: schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" payload: - $ref: "/schema/simple.schema_demo._public.UserSignedUp.avsc" + $ref: "/schema/simple.schema_demo.UserSignedUp.avsc" # PRODUCER/OWNER build pipe will publish schema to SR _public.user_checkout: diff --git a/cli/src/test/resources/simple_spec_demo-api.yaml b/cli/src/test/resources/simple_spec_demo-api.yaml index 44d08195..1c819b1b 100644 --- a/cli/src/test/resources/simple_spec_demo-api.yaml +++ b/cli/src/test/resources/simple_spec_demo-api.yaml @@ -36,7 +36,7 @@ channels: schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" contentType: "application/octet-stream" payload: - $ref: "/schema/simple.schema_demo._public.UserSignedUp.avsc" + $ref: "/schema/simple.schema_demo.UserSignedUp.avsc" _private/user_checkout: bindings: diff --git a/kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java b/kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java index 3a2dc6ac..761f85ec 100644 --- a/kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java +++ b/kafka-test/src/test/java/io/specmesh/kafka/ClientsFunctionalDemoTest.java @@ -80,7 +80,7 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import simple.schema_demo._public.user_signed_up_value.UserSignedUp; +import simple.schema_demo.UserSignedUp; class ClientsFunctionalDemoTest { diff --git a/kafka/src/test/java/simple/schema_demo/_public/user_signed_up_value/UserSignedUp.java b/kafka-test/src/test/java/simple/schema_demo/UserSignedUp.java similarity index 93% rename from kafka/src/test/java/simple/schema_demo/_public/user_signed_up_value/UserSignedUp.java rename to kafka-test/src/test/java/simple/schema_demo/UserSignedUp.java index ff73f1cb..8043a295 100644 --- a/kafka/src/test/java/simple/schema_demo/_public/user_signed_up_value/UserSignedUp.java +++ b/kafka-test/src/test/java/simple/schema_demo/UserSignedUp.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package simple.schema_demo._public.user_signed_up_value; +package simple.schema_demo; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_signed_up.avsc b/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_signed_up.avsc index 73361bfc..9c3420c0 100644 --- a/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_signed_up.avsc +++ b/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_signed_up.avsc @@ -1,6 +1,6 @@ { "type": "record", - "namespace": "simple.schema_demo._public.user_signed_up_value", + "namespace": "simple.schema_demo", "name": "UserSignedUp", "fields": [ {"name": "fullName", "type": "string"}, diff --git a/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_signed_up_2.avsc b/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_signed_up_2.avsc index 73361bfc..9c3420c0 100644 --- a/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_signed_up_2.avsc +++ b/kafka-test/src/test/resources/schema/simple.schema_demo._public.user_signed_up_2.avsc @@ -1,6 +1,6 @@ { "type": "record", - "namespace": "simple.schema_demo._public.user_signed_up_value", + "namespace": "simple.schema_demo", "name": "UserSignedUp", "fields": [ {"name": "fullName", "type": "string"}, diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/schema/AvroReferenceFinder.java b/kafka/src/main/java/io/specmesh/kafka/provision/schema/AvroReferenceFinder.java new file mode 100644 index 00000000..e915f677 --- /dev/null +++ b/kafka/src/main/java/io/specmesh/kafka/provision/schema/AvroReferenceFinder.java @@ -0,0 +1,388 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.kafka.provision.schema; + +import static java.util.Objects.requireNonNull; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.json.JsonMapper; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Helper for finding schema references within an Avro schema. + * + *

To be found, a reference to another type must have a {@code subject} defined. Any type with a + * {@code subject} defined will also be inspected for references. + * + *

For example, given the schema: + * + *

{@code
+ * {
+ *   "type": "record",
+ *   "name": "TypeA",
+ *   "fields": [
+ *     {"name": "f1", "type": "TypeB", "subject": "type.b.subject"}
+ *   ]
+ * }
+ * }
+ * + *

This class will attempt to load {@code TypeB}, by calling on the {@link SchemaLoader}, and + * then parse it looking for anymore references. + * + *

Namespacing

+ * + *

The `type` field can be a simple type, e.g. {@code TypeB}, or fully qualified, e.g. {@code + * some.namespace.TypeB}. Unqualified type names will be prefixed with the current namespace, if + * any. + * + *

For example, given the schema: + * + *

{@code
+ * {
+ *   "type": "record",
+ *   "name": "TypeA",
+ *   "namespace": "some.namespace",
+ *   "fields": [
+ *    {"name": "f1", "type": "TypeB", "subject": "type.b.subject"},
+ *    {"name": "f2", "type": "other.namespace.TypeC", "subject": "type.c.subject"}
+ *  ]
+ * }
+ * }
+ * + *

The {@link SchemaLoader} will be invoked for {@code some.namespace.TypeB} and {@code + * other.namespace.TypeC}. + * + *

Error handling

+ * + *

The finder does not try to validate the supplied schemas are valid Avro. That's left to the + * Avro libraries. However, they must at least be valid JSON, otherwise exceptions will be thrown. + */ +final class AvroReferenceFinder { + + /** Responsible for loading the contents of a type's schema, given the type's name. */ + interface SchemaLoader { + + /** + * Load the schema content. + * + * @param type the name of the type. + * @return the content of the schema. + */ + String load(String type); + } + + private static final JsonMapper MAPPER = + JsonMapper.builder().enable(JsonParser.Feature.ALLOW_COMMENTS).build(); + + private final SchemaLoader schemaLoader; + + /** + * @param schemaLoader called to load the content of a schema for any referenced types. + */ + AvroReferenceFinder(final SchemaLoader schemaLoader) { + this.schemaLoader = requireNonNull(schemaLoader, "schemaLoader"); + } + + /** + * Find all the schema references in the supplied {@code schema}. + * + * @param schemaContent the schema content to check for references. + * @return an ordered stream of leaf-first referenced schemas, including the supplied {@code + * schema}. + */ + List findReferences(final String schemaContent) { + final ParsedSchema schema = ParsedSchema.create("", "", schemaContent); + + final String name = schema.name.orElse(""); + final String namespace = schema.namespace.orElse(""); + final String fullyQualifiedName = namespace.isEmpty() ? name : namespace + "." + name; + + final Set visited = new HashSet<>(); + visited.add(fullyQualifiedName); + + return findReferences(schema, visited); + } + + private List findReferences( + final ParsedSchema schema, final Set visited) { + final String type = schema.type.orElse(""); + if (!"record".equals(type)) { + return List.of(new DetectedSchema(schema, List.of())); + } + + final List detected = + schema.nestedTypes.stream() + .filter(nested -> visited.add(nested.name)) + .map( + nested -> + findReferences( + ParsedSchema.create( + nested.name, + nested.subject, + loadSchema(nested.name)), + visited)) + .flatMap(List::stream) + .collect(Collectors.toList()); + + detected.add(new DetectedSchema(schema, detected)); + + return List.copyOf(detected); + } + + private String loadSchema(final String type) { + try { + return Objects.requireNonNull(schemaLoader.load(type), "loader returned null"); + } catch (final Exception e) { + throw new SchemaLoadException("Failed to load schema for type: " + type, e); + } + } + + private static final class NestedType { + final String name; + final String subject; + + NestedType(final String name, final String subject) { + this.name = requireNonNull(name, "name"); + this.subject = requireNonNull(subject, "subject"); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final NestedType that = (NestedType) o; + return Objects.equals(name, that.name) && Objects.equals(subject, that.subject); + } + + @Override + public int hashCode() { + return Objects.hash(name, subject); + } + } + + public static final class DetectedSchema { + private final String name; + private final String subject; + private final String content; + private final List nestedSchemas; + + DetectedSchema(final ParsedSchema source, final List nestedSchemas) { + this(source.fullName(), source.subject, source.content, nestedSchemas); + } + + DetectedSchema( + final String name, + final String subject, + final String content, + final List nestedSchemas) { + this.name = requireNonNull(name, "name"); + this.subject = requireNonNull(subject, "subject"); + this.content = requireNonNull(content, "content"); + this.nestedSchemas = List.copyOf(requireNonNull(nestedSchemas, "nestedSchemas")); + } + + public String name() { + return name; + } + + public String subject() { + return subject; + } + + public String content() { + return content; + } + + public List references() { + return List.copyOf(nestedSchemas); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DetectedSchema)) { + return false; + } + final DetectedSchema that = (DetectedSchema) o; + return Objects.equals(name, that.name) + && Objects.equals(subject, that.subject) + && Objects.equals(content, that.content) + && Objects.equals(nestedSchemas, that.nestedSchemas); + } + + @Override + public int hashCode() { + return Objects.hash(name, subject, content, nestedSchemas); + } + + @Override + public String toString() { + return "DetectedSchema{" + + "name='" + + name + + '\'' + + ", subject='" + + subject + + '\'' + + ", content='" + + content + + '\'' + + ", nestedSchemas=" + + nestedSchemas + + '}'; + } + } + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + private static final class ParsedSchema { + + private final String subject; + private final String content; + private final Optional type; + private final Optional name; + private final Optional namespace; + private final List nestedTypes; + + private ParsedSchema( + final String subject, + final String content, + final Optional type, + final Optional name, + final Optional namespace, + final List nestedTypes) { + this.subject = requireNonNull(subject, "subject"); + this.content = requireNonNull(content, "content"); + this.type = requireNonNull(type, "type"); + this.name = requireNonNull(name, "name"); + this.namespace = requireNonNull(namespace, "namespace"); + this.nestedTypes = requireNonNull(nestedTypes, "nestedTypes"); + } + + public String fullName() { + if (name.isEmpty()) { + throw new IllegalStateException("Unnamed schema: " + this); + } + return namespace.map(s -> s + "." + name.get()).orElseGet(name::get); + } + + private static ParsedSchema create( + final String typeName, final String subject, final String content) { + try { + final JsonNode rootNode = MAPPER.readTree(content); + final Optional type = textChild("type", rootNode); + final Optional name = textChild("name", rootNode); + final Optional namespace = textChild("namespace", rootNode); + final List nestedTypes = + findNestedTypesWithSubject(rootNode, namespace.orElse("")); + return new ParsedSchema( + subject, + content, + type, + Optional.of(name.orElse(typeName)), + namespace, + nestedTypes); + } catch (final Exception e) { + throw new InvalidSchemaException(typeName, content, e); + } + } + + private static Optional textChild(final String name, final JsonNode node) { + return Optional.ofNullable(node.get(name)) + .filter(JsonNode::isTextual) + .map(JsonNode::asText); + } + + private static List findNestedTypesWithSubject( + final JsonNode node, final String ns) { + final Optional maybeType = + textChild("type", node).filter(text -> !text.isEmpty()); + + final Optional maybeSubject = + textChild("subject", node).filter(text -> !text.isEmpty()); + + if (maybeType.isPresent() && maybeSubject.isPresent()) { + final String type = maybeType.get(); + + if ("array".equals(type)) { + return textChild("items", node) + .filter(text -> !text.isEmpty()) + .map(items -> new NestedType(namespaced(items, ns), maybeSubject.get())) + .map(List::of) + .orElse(List.of()); + } + + if ("map".equals(type)) { + return textChild("values", node) + .filter(text -> !text.isEmpty()) + .map(items -> new NestedType(namespaced(items, ns), maybeSubject.get())) + .map(List::of) + .orElse(List.of()); + } + + return List.of(new NestedType(namespaced(type, ns), maybeSubject.get())); + } + + final List results = new ArrayList<>(0); + for (JsonNode child : node) { + results.addAll(findNestedTypesWithSubject(child, ns)); + } + return results; + } + + private static String namespaced(final String type, final String ns) { + if (ns.isEmpty()) { + return type; + } + + final boolean alreadyNamespaced = type.contains("."); + return alreadyNamespaced ? type : ns + "." + type; + } + } + + private static final class InvalidSchemaException extends RuntimeException { + + InvalidSchemaException(final String typeName, final String content, final Exception cause) { + super( + String.format( + "Schema content invalid. %scontent: %s", named(typeName), content), + cause); + } + + private static String named(final String typeName) { + return typeName.isBlank() ? "" : String.format("name: %s, ", typeName); + } + } + + private static final class SchemaLoadException extends RuntimeException { + SchemaLoadException(final String msg, final Throwable cause) { + super(msg, cause); + } + } +} diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculators.java b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculators.java index 844d113e..de332283 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculators.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculators.java @@ -16,78 +16,71 @@ package io.specmesh.kafka.provision.schema; +import static java.util.Objects.requireNonNull; + +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.specmesh.kafka.provision.Status; import io.specmesh.kafka.provision.schema.SchemaProvisioner.Schema; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * Calculates a changeset of Schemas to create or update, should also return incompatible changes * for existing schemas */ -public final class SchemaChangeSetCalculators { +final class SchemaChangeSetCalculators { /** defensive */ private SchemaChangeSetCalculators() {} /** Collection based */ - public static final class Collective implements ChangeSetCalculator { + static final class Collective implements ChangeSetCalculator { - private final Stream calculatorStream; + private final List calculatorStream; + private final IgnoreCalculator finalCalculator; - /** - * iterate over the calculators - * - * @param writers to iterate - */ - private Collective(final ChangeSetCalculator... writers) { - this.calculatorStream = Arrays.stream(writers); + private Collective( + final IgnoreCalculator finalCalculator, final ChangeSetCalculator... writers) { + this.finalCalculator = requireNonNull(finalCalculator, "finalCalculator"); + this.calculatorStream = List.of(writers); } - /** - * delegates updates - * - * @param existing schemas - * @param required needed schemas - * @return updated status - */ @Override public Collection calculate( - final Collection existing, final Collection required) { - return this.calculatorStream - .map(calculator -> calculator.calculate(existing, required)) - .flatMap(Collection::stream) - .collect(Collectors.toList()); + final Collection existing, + final Collection required, + final String domainId) { + return finalCalculator.calculate( + calculatorStream.stream() + .map(calculator -> calculator.calculate(existing, required, domainId)) + .flatMap(Collection::stream) + .collect(Collectors.toList()), + domainId); } } /** Return set of 'unspecific' (i.e. non-required) schemas */ - public static class CleanUnspecifiedCalculator implements ChangeSetCalculator { + static class CleanUnspecifiedCalculator implements ChangeSetCalculator { - /** - * remove the required items from the existing.. the remainder are not specified - * - * @param existing - existing - * @param required - needed - * @return schemas that aren't specd - */ @Override public Collection calculate( - final Collection existing, final Collection required) { + final Collection existing, + final Collection required, + final String domainId) { existing.removeAll(required); return existing; } } /** Returns those schemas to create and ignores existing */ - public static final class UpdateCalculator implements ChangeSetCalculator { + static final class UpdateCalculator implements ChangeSetCalculator { private final SchemaRegistryClient client; @@ -95,31 +88,28 @@ private UpdateCalculator(final SchemaRegistryClient client) { this.client = client; } - /** - * Calculate the set of schemas to Update and also checks compatibility - * - * @param existing - existing - * @param required - needed - * @return updated set of schemas - */ @Override public Collection calculate( - final Collection existing, final Collection required) { + final Collection existing, + final Collection required, + final String domainId) { final var existingList = new ArrayList<>(existing); return required.stream() - .filter(needs -> existing.contains(needs) && hasChanged(needs, existingList)) + .filter(needs -> hasChanged(needs, existingList)) .peek( schema -> { schema.messages(schema.messages() + "\n Update"); try { final var compatibilityMessages = client.testCompatibilityVerbose( - schema.subject(), schema.getSchema()); - schema.messages( - schema.messages() - + "\nCompatibility test output:" - + compatibilityMessages); + schema.subject(), schema.schema()); + if (!compatibilityMessages.isEmpty()) { + schema.messages( + schema.messages() + + "\nCompatibility test output:" + + compatibilityMessages); + schema.state(Status.STATE.FAILED); } else { schema.state(Status.STATE.UPDATE); @@ -137,26 +127,44 @@ private boolean hasChanged(final Schema needs, final List existingList) final var foundAt = existingList.indexOf(needs); if (foundAt != -1) { final var existing = existingList.get(foundAt); - return !existing.getSchema().equals(needs.getSchema()); + return !normalizeSchema(existing.schema()).equals(normalizeSchema(needs.schema())); } else { return false; } } + + private ParsedSchema normalizeSchema(final ParsedSchema schema) { + if (!(schema instanceof AvroSchema)) { + // References not yet supported: + return schema.normalize(); + } + + final AvroSchema avroSchema = (AvroSchema) schema.normalize(); + + final List references = + avroSchema.references().stream() + .map(ref -> new SchemaReference(ref.getName(), ref.getSubject(), -1)) + .collect(Collectors.toList()); + + return new AvroSchema( + avroSchema.canonicalString(), + references, + avroSchema.resolvedReferences(), + avroSchema.metadata(), + avroSchema.ruleSet(), + -1, + false); + } } /** Returns those schemas to create and ignores existing */ - public static final class CreateCalculator implements ChangeSetCalculator { + static final class CreateCalculator implements ChangeSetCalculator { - /** - * Calculate set of schemas that dont already exist - * - * @param existing - existing schemas - state READ - * @param required - needed schemas - state CREATE - * @return set required to create - status set to CREATE - */ @Override public Collection calculate( - final Collection existing, final Collection required) { + final Collection existing, + final Collection required, + final String domainId) { return required.stream() .filter( schema -> @@ -169,6 +177,29 @@ public Collection calculate( } } + /** Ignores schemas from outside the domain */ + static final class IgnoreCalculator { + + public Collection calculate( + final Collection required, final String domainId) { + return required.stream() + .map(schema -> markIgnoredIfOutsideDomain(schema, domainId)) + .collect(Collectors.toList()); + } + + private Schema markIgnoredIfOutsideDomain(final Schema schema, final String domainId) { + if (schema.schema() instanceof AvroSchema) { + final AvroSchema avroSchema = (AvroSchema) schema.schema(); + if (!avroSchema.rawSchema().getNamespace().equals(domainId)) { + return schema.state(Status.STATE.IGNORED) + .messages("\n ignored as it does not belong to the domain"); + } + } + + return schema; + } + } + /** Main API */ interface ChangeSetCalculator { /** @@ -177,9 +208,11 @@ interface ChangeSetCalculator { * * @param existing - existing * @param required - needed - * @return - set of those that dont exist + * @param domainId the id of the domain being provisioned + * @return - set of those that don't exist */ - Collection calculate(Collection existing, Collection required); + Collection calculate( + Collection existing, Collection required, String domainId); } /** @@ -187,12 +220,12 @@ interface ChangeSetCalculator { * * @return builder */ - public static ChangeSetBuilder builder() { + static ChangeSetBuilder builder() { return ChangeSetBuilder.builder(); } /** Builder of the things */ - public static final class ChangeSetBuilder { + static final class ChangeSetBuilder { /** defensive */ private ChangeSetBuilder() {} @@ -202,7 +235,7 @@ private ChangeSetBuilder() {} * * @return builder */ - public static ChangeSetBuilder builder() { + static ChangeSetBuilder builder() { return new ChangeSetBuilder(); } @@ -213,13 +246,16 @@ public static ChangeSetBuilder builder() { * @param client sr client * @return required calculator */ - public ChangeSetCalculator build( + ChangeSetCalculator build( final boolean cleanUnspecified, final SchemaRegistryClient client) { if (cleanUnspecified) { return new CleanUnspecifiedCalculator(); } else { - return new Collective(new UpdateCalculator(client), new CreateCalculator()); + return new Collective( + new IgnoreCalculator(), + new UpdateCalculator(client), + new CreateCalculator()); } } } diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaMutators.java b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaMutators.java index b2e648fb..03e26cb3 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaMutators.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaMutators.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -63,7 +64,7 @@ private CollectiveMutator(final SchemaMutator... mutators) { * @return updated status */ @Override - public Collection mutate(final Collection schemas) { + public List mutate(final Collection schemas) { return this.mutators .map(mutator -> mutator.mutate(schemas)) .flatMap(Collection::stream) @@ -92,12 +93,13 @@ public CleanUnspecifiedMutator(final boolean dryRun, final SchemaRegistryClient * @return updated set of schemas */ @Override - public Collection mutate(final Collection schemas) { + public List mutate(final Collection schemas) { return schemas.stream() .filter( schema -> !schema.state().equals(STATE.UPDATE) - && !schema.state().equals(STATE.CREATE)) + && !schema.state().equals(STATE.CREATE) + && !schema.state().equals(STATE.IGNORED)) .peek( schema -> { try { @@ -144,14 +146,14 @@ public UpdateMutator(final SchemaRegistryClient client) { * @return updated set of schemas */ @Override - public Collection mutate(final Collection schemas) { + public List mutate(final Collection schemas) { return schemas.stream() .filter(schema -> schema.state().equals(STATE.UPDATE)) .peek( schema -> { try { final var schemaId = - client.register(schema.subject(), schema.getSchema()); + client.register(schema.subject(), schema.schema()); schema.state(UPDATED); schema.messages( "Subject:" @@ -191,7 +193,7 @@ private WriteMutator(final SchemaRegistryClient client) { * @return set of schemas with CREATE or FAILED + Exception */ @Override - public Collection mutate(final Collection schemas) { + public List mutate(final Collection schemas) { return schemas.stream() .filter(schema -> schema.state().equals(STATE.CREATE)) @@ -199,7 +201,7 @@ public Collection mutate(final Collection schemas) { schema -> { try { final var schemaId = - client.register(schema.subject(), schema.getSchema()); + client.register(schema.subject(), schema.schema()); client.updateCompatibility(schema.subject(), DEFAULT_EVOLUTION); schema.messages( "Subject:" @@ -221,6 +223,18 @@ public Collection mutate(final Collection schemas) { } } + /** Passes through ignored schemas so they appear in the output */ + public static final class IgnoredMutator implements SchemaMutator { + + @Override + public List mutate(final Collection schemas) { + + return schemas.stream() + .filter(schema -> schema.state().equals(STATE.IGNORED)) + .collect(Collectors.toList()); + } + } + /** Do nothing mutator */ public static final class NoopSchemaMutator implements SchemaMutator { @@ -231,8 +245,8 @@ public static final class NoopSchemaMutator implements SchemaMutator { * @return schemas without status change */ @Override - public Collection mutate(final Collection schemas) { - return schemas; + public List mutate(final Collection schemas) { + return List.copyOf(schemas); } } @@ -244,7 +258,7 @@ interface SchemaMutator { * @param schemas to write * @return updated status of schemas */ - Collection mutate(Collection schemas); + List mutate(Collection schemas); } /** @@ -321,7 +335,8 @@ public SchemaMutator build() { } else if (dryRun) { return new NoopSchemaMutator(); } else { - return new CollectiveMutator(new UpdateMutator(client), new WriteMutator(client)); + return new CollectiveMutator( + new UpdateMutator(client), new WriteMutator(client), new IgnoredMutator()); } } } diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java index f8a3fcac..62391d0f 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaProvisioner.java @@ -30,13 +30,16 @@ import io.specmesh.kafka.provision.ProvisioningException; import io.specmesh.kafka.provision.Status; import io.specmesh.kafka.provision.WithState; +import io.specmesh.kafka.provision.schema.SchemaReaders.FileSystemSchemaReader.NamedSchema; import io.specmesh.kafka.provision.schema.SchemaReaders.SchemaReader; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collection; -import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.AccessLevel; @@ -51,6 +54,7 @@ public final class SchemaProvisioner { /** defensive */ private SchemaProvisioner() {} + /** * Provision schemas to Schema Registry * @@ -61,7 +65,7 @@ private SchemaProvisioner() {} * @param client the client for the schema registry * @return status of actions */ - public static Collection provision( + public static List provision( final boolean dryRun, final boolean cleanUnspecified, final KafkaApiSpec apiSpec, @@ -72,22 +76,15 @@ public static Collection provision( final var existing = reader.read(apiSpec.id()); - final var required = requiredSchemas(apiSpec, baseResourcePath); + final List required = requiredSchemas(apiSpec, baseResourcePath); if (required.stream().anyMatch(schema -> schema.state.equals(FAILED))) { throw new SchemaProvisioningException("Required Schemas Failed to load:" + required); } - final var schemas = calculator(client, cleanUnspecified).calculate(existing, required); - return mutator(dryRun, cleanUnspecified, client).mutate(sortByReferences(schemas)); - } - - private static Collection sortByReferences(final Collection schemas) { - return schemas.stream() - .sorted( - Comparator.comparingInt( - o -> o.schemas.iterator().next().references().size())) - .collect(Collectors.toList()); + final Collection schemas = + calculator(client, cleanUnspecified).calculate(existing, required, apiSpec.id()); + return mutator(dryRun, cleanUnspecified, client).mutate(schemas); } /** @@ -128,67 +125,89 @@ private static SchemaChangeSetCalculators.ChangeSetCalculator calculator( */ private static List requiredSchemas( final KafkaApiSpec apiSpec, final String baseResourcePath) { + final Path basePath = Paths.get(baseResourcePath); + + final Set seenSubjects = new HashSet<>(); return apiSpec.listDomainOwnedTopics().stream() - .flatMap(topic -> topicSchemas(apiSpec, baseResourcePath, topic.name())) + .flatMap(topic -> topicSchemas(apiSpec, basePath, topic.name())) + .filter(e -> seenSubjects.add(e.subject())) .collect(Collectors.toList()); } private static Stream topicSchemas( - final KafkaApiSpec apiSpec, final String baseResourcePath, final String topicName) { - return apiSpec.ownedTopicSchemas(topicName).stream() - .flatMap( + final KafkaApiSpec apiSpec, final Path baseResourcePath, final String topicName) { + return apiSpec.ownedTopicSchemas(topicName) + .map( si -> Stream.of( - si.key() - .flatMap(RecordPart::schemaRef) - .map( - schemaRef -> - partSchema( - "key", - schemaRef, - si, - baseResourcePath, - topicName)), - si.value() - .schemaRef() - .map( - schemaRef -> - partSchema( - "value", - schemaRef, - si, - baseResourcePath, - topicName)))) - .flatMap(Optional::stream); + si.key() + .flatMap(RecordPart::schemaRef) + .map( + keySchema -> + partSchemas( + "key", + keySchema, + si, + baseResourcePath, + topicName)), + si.value() + .schemaRef() + .map( + valueSchema -> + partSchemas( + "value", + valueSchema, + si, + baseResourcePath, + topicName))) + .flatMap(Optional::stream) + .flatMap(Function.identity())) + .orElse(Stream.empty()); } - private static Schema partSchema( + private static Stream partSchemas( final String partName, final String schemaRef, final SchemaInfo si, - final String baseResourcePath, + final Path baseResourcePath, final String topicName) { - final Schema.SchemaBuilder builder = Schema.builder(); try { - final Path schemaPath = Paths.get(baseResourcePath, schemaRef); - final Collection schemas = + final Path schemaPath = Path.of(baseResourcePath.toString(), schemaRef); + final List schemas = new SchemaReaders.FileSystemSchemaReader().readLocal(schemaPath); - builder.schemas(schemas) - .type(schemaRef) - .subject(resolveSubjectName(topicName, schemas, si, partName)); - builder.state(CREATE); + return schemas.stream() + .map( + ns -> { + final ParsedSchema schema = ns.schema(); + final String subject = + ns.subject().isEmpty() + ? resolveSubjectName( + topicName, schema, si, partName) + : ns.subject(); + + return Schema.builder() + .schema(schema) + .type(schema.schemaType()) + .subject(subject) + .state(CREATE) + .build(); + }); } catch (ProvisioningException ex) { - builder.state(FAILED); - builder.exception(ex); + return Stream.of( + Schema.builder() + .messages("Failed to parse: " + schemaRef) + .state(FAILED) + .exception(ex) + .build()); } - return builder.build(); } /** - * Follow these guidelines for Confluent SR and APICurio - * https://docs.confluent.io/platform/6.2/schema-registry/serdes-develop/index.html#referenced-schemas - * https://docs.confluent.io/platform/6.2/schema-registry/serdes-develop/index.html#subject-name-strategy + * Follow these guidelines for Confluent + * SR and APICurio * *

APICurio SimpleTopicIdStrategy - Simple strategy that only uses the topic name. * RecordIdStrategy - Avro-specific strategy that uses the full name of the schema. @@ -201,16 +220,11 @@ private static Schema partSchema( * group logically related events that may have different data structures under a subject. * TopicRecordNameStrategy - Derives the subject name from topic and record name, as a way to * group logically related events that may have different data structures under a subject. - * - * @param topicName - * @param schemas - * @param schemaInfo - * @return */ @SuppressWarnings("checkstyle:CyclomaticComplexity") private static String resolveSubjectName( final String topicName, - final Collection schemas, + final ParsedSchema schema, final SchemaInfo schemaInfo, final String partName) { final String lookup = schemaInfo.schemaLookupStrategy().orElse(""); @@ -220,25 +234,22 @@ private static String resolveSubjectName( } if (lookup.equalsIgnoreCase("RecordNameStrategy") || lookup.equalsIgnoreCase("RecordIdStrategy")) { - final var next = schemas.iterator().next(); - if (!isAvro(next)) { + if (!isAvro(schema)) { throw new UnsupportedOperationException( "Currently, only avro schemas support RecordNameStrategy and" + " RecordIdStrategy"); } - return ((AvroSchema) next).rawSchema().getFullName(); + return ((AvroSchema) schema).rawSchema().getFullName(); } if (lookup.equalsIgnoreCase("TopicRecordIdStrategy") || lookup.equalsIgnoreCase("TopicRecordNameStrategy")) { - - final var next = schemas.iterator().next(); - if (!isAvro(next)) { + if (!isAvro(schema)) { throw new UnsupportedOperationException( "Currently, only avro schemas support TopicRecordNameStrategy and" + " TopicRecordIdStrategy"); } - return topicName + "-" + ((AvroSchema) next).rawSchema().getFullName(); + return topicName + "-" + ((AvroSchema) schema).rawSchema().getFullName(); } return topicName + "-" + partName; } @@ -272,16 +283,12 @@ public static final class Schema implements WithState { private Exception exception; @Builder.Default private String messages = ""; - Collection schemas; + private ParsedSchema schema; public Schema exception(final Exception exception) { this.exception = new ExceptionWrapper(exception); return this; } - - public ParsedSchema getSchema() { - return this.schemas.iterator().next(); - } } public static class SchemaProvisioningException extends RuntimeException { diff --git a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaReaders.java b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaReaders.java index 7bd73bf1..8dfb982f 100644 --- a/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaReaders.java +++ b/kafka/src/main/java/io/specmesh/kafka/provision/schema/SchemaReaders.java @@ -16,9 +16,10 @@ package io.specmesh.kafka.provision.schema; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toMap; + import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.kafka.schemaregistry.ParsedSchema; @@ -29,15 +30,16 @@ import io.confluent.kafka.schemaregistry.json.JsonSchema; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.specmesh.kafka.provision.Status; +import io.specmesh.kafka.provision.schema.AvroReferenceFinder.DetectedSchema; import io.specmesh.kafka.provision.schema.SchemaProvisioner.Schema; import io.specmesh.kafka.provision.schema.SchemaProvisioner.SchemaProvisioningException; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -59,73 +61,72 @@ private SchemaReaders() {} public static final class FileSystemSchemaReader { - public Collection readLocal(final Path filePath) { - try { - final var schemaContent = Files.readString(filePath); - final var results = new ArrayList(); - - final String filename = - Optional.ofNullable(filePath.getFileName()) - .map(Objects::toString) - .orElse(""); - if (filename.endsWith(".avsc")) { - final var refs = resolveReferencesFor(filePath, schemaContent); - results.add( - new AvroSchema( - schemaContent, refs.references, refs.resolvedReferences, -1)); - } else if (filename.endsWith(".yml")) { - results.add(new JsonSchema(schemaContent)); - } else if (filename.endsWith(".proto")) { - results.add(new ProtobufSchema(schemaContent)); - } else { - throw new UnsupportedOperationException("Unsupported schema file: " + filePath); - } - - return results; - } catch (Exception ex) { - throw new SchemaProvisioningException( - "Failed to load: " + filePath + " from: " + filePath.toAbsolutePath(), ex); + public static final class NamedSchema { + private final String subject; + private final ParsedSchema schema; + + public NamedSchema(final String subject, final ParsedSchema schema) { + this.subject = requireNonNull(subject, "subject"); + this.schema = requireNonNull(schema, "parsedSchema"); + } + + public ParsedSchema schema() { + return schema; + } + + public String subject() { + return subject; } } /** - * Avro schema reference resolution - * - * @param filePath - * @param schemaContent - * @return + * @param filePath path to schema. + * @return ordered list of schema, with schema dependencies earlier in the list. The schema + * loaded from {@code filePath} will have an empty subject */ - private SchemaReferences resolveReferencesFor( - final Path filePath, final String schemaContent) { - try { - final SchemaReferences results = new SchemaReferences(); - final var refs = findJsonNodes(objectMapper.readTree(schemaContent), "subject"); - final var parent = filePath.toFile().getParent(); - refs.forEach(ref -> results.add(parent, ref)); - return results; - } catch (JsonProcessingException e) { - throw new SchemaProvisioningException( - "Cannot resolve SchemaReferences for:" + filePath, e); + public List readLocal(final Path filePath) { + final String schemaContent = readSchema(filePath); + final String filename = + Optional.ofNullable(filePath.getFileName()).map(Objects::toString).orElse(""); + if (filename.endsWith(".avsc")) { + final Path schemaDir = filePath.toAbsolutePath().getParent(); + final AvroReferenceFinder refFinder = + new AvroReferenceFinder( + type -> readSchema(schemaDir.resolve(type + ".avsc"))); + + return refFinder.findReferences(schemaContent).stream() + .map(s -> new NamedSchema(s.subject(), toAvroSchema(s))) + .collect(Collectors.toList()); + } else if (filename.endsWith(".yml")) { + return List.of(new NamedSchema("", new JsonSchema(schemaContent))); + } else if (filename.endsWith(".proto")) { + return List.of(new NamedSchema("", new ProtobufSchema(schemaContent))); + } else { + throw new UnsupportedOperationException("Unsupported schema file: " + filePath); } } - private List findJsonNodes(final JsonNode node, final String searchFor) { - if (node.has(searchFor)) { - return List.of(node); - } - // Recursively traverse child nodes - final var results = new ArrayList(); - if (node.isArray()) { - for (JsonNode child : node) { - results.addAll(findJsonNodes(child, searchFor)); - } - return results; - } else if (node.isObject()) { - for (Iterator> it = node.fields(); it.hasNext(); ) { - results.addAll(findJsonNodes(it.next().getValue(), searchFor)); - } + private static AvroSchema toAvroSchema(final DetectedSchema schema) { + + final List references = + schema.references().stream() + .map(ref -> new SchemaReference(ref.name(), ref.subject(), -1)) + .collect(Collectors.toList()); + + final Map resolvedReferences = + schema.references().stream() + .collect(toMap(DetectedSchema::subject, DetectedSchema::content)); + + return new AvroSchema(schema.content(), references, resolvedReferences, -1); + } + + private String readSchema(final Path path) { + try { + return Files.readString(path, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new SchemaProvisioningException( + "Failed to read schema at path:" + path.toAbsolutePath(), e); } - return results; } } /** Read Schemas from registry for given prefix */ @@ -156,7 +157,7 @@ public Collection read(final String prefix) { final var schemas = subjects.stream() .collect( - Collectors.toMap( + toMap( subject -> subject, subject -> { try { @@ -175,7 +176,7 @@ public Collection read(final String prefix) { Schema.builder() .subject(entry.getKey()) .type(entry.getValue().get(0).schemaType()) - .schemas(resolvePayload(entry.getValue().get(0))) + .schema(entry.getValue().get(0)) .state(Status.STATE.READ) .build()) .collect(Collectors.toList()); @@ -183,29 +184,6 @@ public Collection read(final String prefix) { throw new SchemaProvisioningException("Failed to read schemas for:" + prefix, e); } } - - private List resolvePayload(final ParsedSchema schema) { - return List.of(parsedSchema(schema)); - } - - private ParsedSchema parsedSchema(final ParsedSchema schema) { - final String type = schema.schemaType(); - final String payload = schema.canonicalString(); - if (type.endsWith(".avsc") || type.equals("AVRO")) { - return new AvroSchema( - payload, - schema.references(), - ((AvroSchema) schema).resolvedReferences(), - -1); - } - if (type.endsWith(".yml") || type.equals("JSON")) { - return new JsonSchema(payload); - } - if (type.endsWith(".proto") || type.equals("PROTOBUF")) { - return new ProtobufSchema(payload); - } - return null; - } } /** Read Acls API */ @@ -269,21 +247,11 @@ public SchemaReader build() { @Accessors(fluent = true) public static class SchemaReferences { final List references = new ArrayList<>(); - final Map resolvedReferences = new HashMap<>(); - - public void add(final String path, final JsonNode ref) { - try { - references.add( - new SchemaReference( - ref.get("name").asText(), ref.get("subject").asText(), -1)); + final Map resolvedReferences = new LinkedHashMap<>(); - resolvedReferences.put( - ref.get("subject").asText(), - Files.readString(Path.of(path, ref.get("subject").asText() + ".avsc"))); - } catch (IOException e) { - throw new SchemaProvisioningException( - "Cannot construct AVRO SchemaReference from:" + ref, e); - } + public void add(final String type, final String subject, final String content) { + references.add(new SchemaReference(type, subject, -1)); + resolvedReferences.put(subject, content); } } } diff --git a/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java index 820971d3..9dc40d7a 100644 --- a/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/ProvisionerFreshStartFunctionalTest.java @@ -361,8 +361,8 @@ void shouldPublishSchemasFromEmptyCluster() throws RestClientException, IOExcept is( containsInAnyOrder( "io.specmesh.kafka.schema.UserInfo", - "simple.provision_demo._public.user_signed_up_value.key.UserSignedUpKey", - "simple.provision_demo._public.user_signed_up_value.UserSignedUp"))); + "simple.provision_demo.UserSignedUpKey", + "simple.provision_demo.UserSignedUp"))); } private static Set aclsForOtherDomain(final Domain domain) { diff --git a/kafka/src/test/java/io/specmesh/kafka/admin/SimpleAdminClientTest.java b/kafka/src/test/java/io/specmesh/kafka/admin/SimpleAdminClientTest.java index fcf7e4ec..80449e22 100644 --- a/kafka/src/test/java/io/specmesh/kafka/admin/SimpleAdminClientTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/admin/SimpleAdminClientTest.java @@ -27,14 +27,13 @@ import io.specmesh.kafka.KafkaApiSpec; import io.specmesh.kafka.KafkaEnvironment; import io.specmesh.kafka.provision.Provisioner; +import io.specmesh.kafka.provision.Status; import io.specmesh.test.TestSpecLoader; import java.time.Duration; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; @@ -45,7 +44,7 @@ import org.apache.kafka.common.serialization.LongSerializer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import simple.schema_demo._public.user_signed_up_value.UserSignedUp; +import simple.schema_demo.UserSignedUp; class SimpleAdminClientTest { @@ -63,7 +62,7 @@ class SimpleAdminClientTest { TestSpecLoader.loadFromClassPath("clientapi-functional-test-api.yaml"); @Test - void shouldRecordStats() throws ExecutionException, InterruptedException, TimeoutException { + void shouldRecordStats() throws Exception { final var userSignedUpTopic = topicName("_public.user_signed_up"); final var sentRecord = new UserSignedUp("joe blogs", "blogy@twasmail.com", 100); @@ -71,15 +70,17 @@ void shouldRecordStats() throws ExecutionException, InterruptedException, Timeou try (Admin adminClient = KAFKA_ENV.adminClient()) { final var client = SmAdminClient.create(adminClient); - Provisioner.builder() - .apiSpec(API_SPEC) - .schemaPath("./src/test/resources") - .adminClient(adminClient) - .schemaRegistryClient(KAFKA_ENV.srClient()) - .closeSchemaClient(true) - .build() - .provision() - .check(); + final Status status = + Provisioner.builder() + .apiSpec(API_SPEC) + .schemaPath("./src/test/resources") + .adminClient(adminClient) + .schemaRegistryClient(KAFKA_ENV.srClient()) + .closeSchemaClient(true) + .build() + .provision(); + + status.check(); // write seed info try (var producer = avroProducer(OWNER_USER)) { diff --git a/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerFunctionalTest.java b/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerFunctionalTest.java index 1dbbe9e9..e0ef549f 100644 --- a/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerFunctionalTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerFunctionalTest.java @@ -35,7 +35,6 @@ import io.specmesh.kafka.provision.schema.SchemaProvisioner.Schema; import io.specmesh.test.TestSpecLoader; import java.util.Collection; -import java.util.List; import java.util.stream.Collectors; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -168,8 +167,8 @@ void shouldPublishUpdatedSchemas() throws Exception { is( containsInAnyOrder( "io.specmesh.kafka.schema.UserInfo", - "simple.provision_demo._public.user_signed_up_value.key.UserSignedUpKey", - "simple.provision_demo._public.user_signed_up_value.UserSignedUp"))); + "simple.provision_demo.UserSignedUpKey", + "simple.provision_demo.UserSignedUp"))); } @Test @@ -191,12 +190,12 @@ void shouldRemoveUnspecdSchemas() throws Exception { Schema.builder() .subject(subject) .type("/schema/simple.provision_demo._public.user_signed_up.avsc") - .schemas(List.of(new AvroSchema(schemaContent))) + .schema(new AvroSchema(schemaContent)) .state(STATE.READ) .build(); // insert the bad schema - srClient.register(subject, schema.getSchema()); + srClient.register(subject, schema.schema()); testDryRun(subject); testCleanUnSpecSchemas(); diff --git a/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerReferenceTest.java b/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerReferenceTest.java index 60d02a7e..42d8edcc 100644 --- a/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerReferenceTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/provision/SchemaProvisionerReferenceTest.java @@ -17,14 +17,21 @@ package io.specmesh.kafka.provision; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import io.specmesh.kafka.DockerKafkaEnvironment; import io.specmesh.kafka.KafkaApiSpec; import io.specmesh.kafka.KafkaEnvironment; import io.specmesh.kafka.provision.schema.SchemaProvisioner; +import io.specmesh.kafka.provision.schema.SchemaProvisioner.Schema; import io.specmesh.test.TestSpecLoader; -import java.util.ArrayList; -import org.hamcrest.Matchers; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; @@ -44,10 +51,6 @@ class SchemaProvisionerReferenceTest { private static final KafkaApiSpec API_SPEC = TestSpecLoader.loadFromClassPath("schema-ref/com.example.trading-api.yml"); - private static final KafkaApiSpec SPEC_WITH_REFS_API_SPEC = - TestSpecLoader.loadFromClassPath( - "schema-ref/com.example.single-spec-with-refs-api.yml"); - private static final String ADMIN_USER = "admin"; @RegisterExtension @@ -62,9 +65,9 @@ class SchemaProvisionerReferenceTest { @Test @Order(1) - void shouldProvisionSpecWithMissingRefToCurrency() { - - final var provision = + void shouldFailToProvisionIfSharedSchemaAreNotRegistered() { + // When: + final List provision = SchemaProvisioner.provision( false, false, @@ -72,82 +75,86 @@ void shouldProvisionSpecWithMissingRefToCurrency() { "./src/test/resources/schema-ref", KAFKA_ENV.srClient()); - final var schemaState = provision.iterator().next(); + // Then: + assertThat(provision, hasSize(3)); + + final Map bySubject = + provision.stream().collect(Collectors.toMap(Schema::subject, Function.identity())); + assertThat( + bySubject.keySet(), + is( + Set.of( + "com.example.shared.Currency", + "com.example.trading._public.trade-value", + "com.example.trading.TradeInfo"))); + + final Schema commonSchema = bySubject.get("com.example.shared.Currency"); + assertThat(commonSchema.state(), is(Status.STATE.IGNORED)); assertThat( - "Failed to load with:" + schemaState.toString(), - schemaState.state(), - Matchers.is(Status.STATE.FAILED)); + commonSchema.messages(), endsWith("ignored as it does not belong to the domain")); + + final Schema infoSchema = bySubject.get("com.example.trading.TradeInfo"); + assertThat(infoSchema.state(), is(Status.STATE.CREATED)); + + final Schema tradeSchema = bySubject.get("com.example.trading._public.trade-value"); + assertThat(tradeSchema.state(), is(Status.STATE.FAILED)); } - /** publish common schema (via api) and also domain-owned schema */ + @Test @Order(2) - void shouldProvisionTwoSpecsWithRefs() { - - final var provisionCommon = + void shouldProvisionOnceSharedSchemaAreRegistered() { + // Given: + SchemaProvisioner.provision( + false, + false, + COMMON_API_SPEC, + "./src/test/resources/schema-ref", + KAFKA_ENV.srClient()); + + // When: + final List provision = SchemaProvisioner.provision( false, false, - COMMON_API_SPEC, + API_SPEC, "./src/test/resources/schema-ref", KAFKA_ENV.srClient()); - final var commonSchemaState = provisionCommon.iterator().next(); + // Then: + final Map bySubject = + provision.stream().collect(Collectors.toMap(Schema::subject, Function.identity())); assertThat( - "Failed to load with:" + commonSchemaState.toString(), - commonSchemaState.state(), - Matchers.is(Status.STATE.CREATED)); + bySubject.keySet(), + is( + Set.of( + "com.example.shared.Currency", + "com.example.trading._public.trade-value"))); - final var provision = - SchemaProvisioner.provision( - false, - false, - API_SPEC, - "./src/test/resources/schema-ref", - KAFKA_ENV.srClient()); + final Schema commonSchema = bySubject.get("com.example.shared.Currency"); + assertThat(commonSchema.state(), is(Status.STATE.IGNORED)); - final var schemaState = provision.iterator().next(); - assertThat( - "Failed to load with:" + schemaState.toString(), - schemaState.state(), - Matchers.is(Status.STATE.CREATED)); + final Schema tradeSchema = bySubject.get("com.example.trading._public.trade-value"); + assertThat(tradeSchema.state(), is(Status.STATE.CREATED)); } @Test @Order(3) - void shouldProvisionSpecsWithMultipleRefsSoThatZeroRefsRegisterFirst() { - - final var provisionBothSchemas = + void shouldNotProvisionAnythingIfNothingHasChanged() { + // When: + final List provision = SchemaProvisioner.provision( false, false, - SPEC_WITH_REFS_API_SPEC, + API_SPEC, "./src/test/resources/schema-ref", KAFKA_ENV.srClient()); - final var schemas = new ArrayList<>(provisionBothSchemas); - - assertThat(schemas, Matchers.hasSize(2)); + // Then: + final Map bySubject = + provision.stream().collect(Collectors.toMap(Schema::subject, Function.identity())); + assertThat(bySubject.keySet(), is(Set.of("com.example.shared.Currency"))); - final var currencySchemaState = schemas.get(0); - assertThat( - "Failed to load with:" + currencySchemaState.subject(), - currencySchemaState.subject(), - Matchers.is("com.example.shared.Currency")); - - assertThat( - "Failed to load with:" + currencySchemaState, - currencySchemaState.state(), - Matchers.is(Status.STATE.CREATED)); - - final var tradeSchemaState = schemas.get(1); - assertThat( - "Failed to load with:" + tradeSchemaState.subject(), - tradeSchemaState.subject(), - Matchers.is("com.example.refs._public.trade-value")); - - assertThat( - "Failed to load with:" + tradeSchemaState, - tradeSchemaState.state(), - Matchers.is(Status.STATE.CREATED)); + final Schema commonSchema = bySubject.get("com.example.shared.Currency"); + assertThat(commonSchema.state(), is(Status.STATE.IGNORED)); } } diff --git a/kafka/src/test/java/io/specmesh/kafka/provision/schema/AvroReferenceFinderTest.java b/kafka/src/test/java/io/specmesh/kafka/provision/schema/AvroReferenceFinderTest.java new file mode 100644 index 00000000..f8841fdf --- /dev/null +++ b/kafka/src/test/java/io/specmesh/kafka/provision/schema/AvroReferenceFinderTest.java @@ -0,0 +1,414 @@ +/* + * Copyright 2023 SpecMesh Contributors (https://github.com/specmesh) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.specmesh.kafka.provision.schema; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.core.JsonParseException; +import io.specmesh.kafka.provision.schema.AvroReferenceFinder.DetectedSchema; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class AvroReferenceFinderTest { + + @Mock private AvroReferenceFinder.SchemaLoader schemaLoader; + private AvroReferenceFinder refFinder; + + @BeforeEach + void setUp() { + refFinder = new AvroReferenceFinder(schemaLoader); + } + + @Test + void shouldThrowIfSchemaCanNotBeParsed() { + // Given: + final String schema = "not-json"; + + // When: + final Exception e = + assertThrows(RuntimeException.class, () -> refFinder.findReferences(schema)); + + // Then: + assertThat(e.getMessage(), is("Schema content invalid. content: not-json")); + assertThat(e.getCause(), is(instanceOf(JsonParseException.class))); + } + + @Test + void shouldThrowIfReferencedSchemaCanNotBeParsed() { + // Given: a -> b + final String a = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeA\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f1\", \"type\": \"TypeB\", \"subject\":" + + " \"type.b.subject\"}\n" + + " ]\n" + + "}"; + + when(schemaLoader.load(any())).thenReturn("invalid-schema"); + + // When: + final Exception e = assertThrows(RuntimeException.class, () -> refFinder.findReferences(a)); + + // Then: + assertThat( + e.getMessage(), is("Schema content invalid. name: TypeB, content: invalid-schema")); + assertThat(e.getCause(), is(instanceOf(JsonParseException.class))); + } + + @Test + void shouldReturnSchemaIfJsonButNotAvro() { + // Given: + final String schema = "1234"; + + // When: + final List result = refFinder.findReferences(schema); + + // Then: + assertThat(result, contains(new DetectedSchema("", "", schema, List.of()))); + } + + @Test + void shouldReturnSchemaIfNoReferences() { + // Given: + final String schema = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"SomeType\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f\", \"type\": \"string\"}\n" + + " ]\n" + + "}"; + + // When: + final List result = refFinder.findReferences(schema); + + // Then: + assertThat(result, contains(new DetectedSchema("SomeType", "", schema, List.of()))); + } + + @Test + void shouldReturnSchemaIfNotRecord() { + // Given: + final String schema = "{\n" + " \"type\": \"int\"\n" + "}"; + + // When: + final List result = refFinder.findReferences(schema); + + // Then: + assertThat(result, contains(new DetectedSchema("", "", schema, List.of()))); + } + + @Test + void shouldReturnLeafFirst() { + // Given: a -> b -> c + final String a = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeA\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f1\", \"type\": \"TypeB\", \"subject\":" + + " \"type.b.subject\"}\n" + + " ]\n" + + "}"; + + final String b = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeB\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f2\", \"type\": \"TypeC\", \"subject\":" + + " \"type.c.subject\"}\n" + + " ]\n" + + "}"; + + final String c = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeC\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f3\", \"type\": \"string\"}\n" + + " ]\n" + + "}"; + + when(schemaLoader.load("TypeB")).thenReturn(b); + when(schemaLoader.load("TypeC")).thenReturn(c); + + // When: + final List result = refFinder.findReferences(a); + + // Then: + final DetectedSchema schemaC = new DetectedSchema("TypeC", "type.c.subject", c, List.of()); + final DetectedSchema schemaB = + new DetectedSchema("TypeB", "type.b.subject", b, List.of(schemaC)); + final DetectedSchema schemaA = + new DetectedSchema("TypeA", "", a, List.of(schemaC, schemaB)); + assertThat(result, contains(schemaC, schemaB, schemaA)); + } + + @Test + void shouldSupportFullyQualifiedTypes() { + // Given: a -> b -> c + final String a = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeA\",\n" + + " \"namespace\": \"ns.one\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f1\", \"type\": \"TypeB\", \"subject\":" + + " \"type.b.subject\"}\n" + + " ]\n" + + "}"; + + final String b = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeB\",\n" + + " \"namespace\": \"ns.one\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f2\", \"type\": \"ns.two.TypeC\", \"subject\":" + + " \"type.c.subject\"}\n" + + " ]\n" + + "}"; + + final String c = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeC\",\n" + + " \"namespace\": \"ns.two\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f3\", \"type\": \"string\"}\n" + + " ]\n" + + "}"; + + when(schemaLoader.load("ns.one.TypeB")).thenReturn(b); + when(schemaLoader.load("ns.two.TypeC")).thenReturn(c); + + // When: + final List result = refFinder.findReferences(a); + + // Then: + final DetectedSchema schemaC = + new DetectedSchema("ns.two.TypeC", "type.c.subject", c, List.of()); + final DetectedSchema schemaB = + new DetectedSchema("ns.one.TypeB", "type.b.subject", b, List.of(schemaC)); + final DetectedSchema schemaA = + new DetectedSchema("ns.one.TypeA", "", a, List.of(schemaC, schemaB)); + assertThat(result, contains(schemaC, schemaB, schemaA)); + } + + @Test + void shouldNotRevisitSameSchemaTwice() { + // Given: a -> b + // | -> b + final String a = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeA\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f1\", \"type\": \"TypeB\", \"subject\":" + + " \"type.b.subject\"},\n" + + " {\"name\": \"f2\", \"type\": \"TypeB\", \"subject\":" + + " \"type.b.subject\"}\n" + + " ]\n" + + "}"; + + final String b = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeB\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f2\", \"type\": \"string\"}\n" + + " ]\n" + + "}"; + + when(schemaLoader.load("TypeB")).thenReturn(b); + + // When: + refFinder.findReferences(a); + + // Then: + verify(schemaLoader, times(1)).load(any()); + } + + @Test + void shouldHandleCircularReferencesByIgnoringThem() { + // Given: a -> b -> c + // a <- | | + // b <- | + // a <-------| + final String a = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeA\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f1\", \"type\": \"TypeB\", \"subject\":" + + " \"type.b.subject\"}\n" + + " ]\n" + + "}"; + + final String b = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeB\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f2\", \"type\": \"TypeC\", \"subject\":" + + " \"type.c.subject\"},\n" + + " {\"name\": \"f3\", \"type\": \"TypeA\", \"subject\":" + + " \"type.a.subject\"}\n" + + " ]\n" + + "}"; + + final String c = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeC\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f4\", \"type\": \"TypeB\", \"subject\":" + + " \"type.b.subject\"},\n" + + " {\"name\": \"f5\", \"type\": \"TypeA\", \"subject\":" + + " \"type.a.subject\"}\n" + + " ]\n" + + "}"; + + when(schemaLoader.load("TypeB")).thenReturn(b); + when(schemaLoader.load("TypeC")).thenReturn(c); + + // When: + final List result = refFinder.findReferences(a); + + // Then: + final DetectedSchema schemaC = new DetectedSchema("TypeC", "type.c.subject", c, List.of()); + final DetectedSchema schemaB = + new DetectedSchema("TypeB", "type.b.subject", b, List.of(schemaC)); + final DetectedSchema schemaA = + new DetectedSchema("TypeA", "", a, List.of(schemaC, schemaB)); + assertThat(result, contains(schemaC, schemaB, schemaA)); + verify(schemaLoader, times(2)).load(any()); + } + + @Test + void shouldThrowIfReferenceCanNotBeResolved() { + // Given: + final String a = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeA\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f1\", \"type\": \"TypeB\", \"subject\":" + + " \"type.b.subject\"}\n" + + " ]\n" + + "}"; + + final RuntimeException cause = mock(); + when(schemaLoader.load("TypeB")).thenThrow(cause); + + // When: + final Exception e = assertThrows(RuntimeException.class, () -> refFinder.findReferences(a)); + + // Then: + assertThat(e.getMessage(), is("Failed to load schema for type: TypeB")); + assertThat(e.getCause(), is(sameInstance(cause))); + } + + @Test + void shouldPickUpReferencesFromArrays() { + // Given: a -> b + final String a = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeA\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f1\", \"type\": {\"type\": \"array\", \"items\": \"TypeB\"," + + " \"subject\": \"type.b.subject\"}}\n" + + " ]\n" + + "}"; + + final String b = "{\n" + " \"type\": \"int\"\n" + "}"; + + when(schemaLoader.load("TypeB")).thenReturn(b); + + // When: + final List result = refFinder.findReferences(a); + + // Then: + final DetectedSchema schemaB = new DetectedSchema("TypeB", "type.b.subject", b, List.of()); + final DetectedSchema schemaA = new DetectedSchema("TypeA", "", a, List.of(schemaB)); + assertThat(result, contains(schemaB, schemaA)); + } + + @Test + void shouldPickUpReferencesFromMaps() { + // Given: a -> b + final String a = + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TypeA\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f1\", \"type\": {\"type\": \"map\", \"values\": \"TypeB\"," + + " \"subject\": \"type.b.subject\"}}\n" + + " ]\n" + + "}"; + + final String b = "{\n" + " \"type\": \"int\"\n" + "}"; + + when(schemaLoader.load("TypeB")).thenReturn(b); + + // When: + final List result = refFinder.findReferences(a); + + // Then: + final DetectedSchema schemaB = new DetectedSchema("TypeB", "type.b.subject", b, List.of()); + final DetectedSchema schemaA = new DetectedSchema("TypeA", "", a, List.of(schemaB)); + assertThat(result, contains(schemaB, schemaA)); + } + + @Test + void shouldHandleSchemaWithComments() { + // Given: + final String schema = + "{\n" + + " // a comment" + + " \"type\": \"record\",\n" + + " \"name\": \"SomeType\",\n" + + " \"fields\": [\n" + + " {\"name\": \"f\", \"type\": \"string\"}\n" + + " ]\n" + + "}"; + + // When: + refFinder.findReferences(schema); + + // Then: did not throw + } +} diff --git a/kafka/src/test/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculatorsTest.java b/kafka/src/test/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculatorsTest.java index 167675ee..50fa5963 100644 --- a/kafka/src/test/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculatorsTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/provision/schema/SchemaChangeSetCalculatorsTest.java @@ -17,14 +17,21 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.specmesh.kafka.DockerKafkaEnvironment; import io.specmesh.kafka.KafkaEnvironment; import io.specmesh.kafka.provision.Status; import io.specmesh.kafka.provision.schema.SchemaProvisioner.Schema; import java.nio.file.Path; +import java.util.Collection; import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -33,44 +40,49 @@ class SchemaChangeSetCalculatorsTest { @RegisterExtension private static final KafkaEnvironment KAFKA_ENV = DockerKafkaEnvironment.builder().build(); - public static final String SCHEMA_FILENAME = "simple.provision_demo._public.user_signed_up"; + private static final String DOMAIN_ID = "simple.provision_demo"; + private static final String SCHEMA_BASE = "simple.provision_demo._public."; + + private SchemaRegistryClient client; + private String subject; + + @BeforeEach + void setUp() { + client = KAFKA_ENV.srClient(); + subject = "subject." + UUID.randomUUID(); + } @Test void shouldOutputMessagesOnBorkedSchema() throws Exception { + // Given: + final ParsedSchema existingSchema = loadSchema(SCHEMA_BASE + "user_signed_up.avsc"); - final var client = KAFKA_ENV.srClient(); - - client.register( - "subject", - new SchemaReaders.FileSystemSchemaReader() - .readLocal(filename(".avsc")) - .iterator() - .next()); + final int version = client.register(subject, existingSchema); - final var existing = + final List existing = List.of( Schema.builder() .type("AVRO") - .subject("subject") + .subject(subject) .state(Status.STATE.READ) - .schemas( - new SchemaReaders.FileSystemSchemaReader() - .readLocal(filename(".avsc"))) + .schema(existingSchema.copy(version)) .build()); - final var required = + + final List required = List.of( Schema.builder() - .subject("subject") .type("AVRO") + .subject(subject) .state(Status.STATE.READ) - .schemas( - new SchemaReaders.FileSystemSchemaReader() - .readLocal(filename("-v3-bad.avsc"))) + .schema(loadSchema(SCHEMA_BASE + "user_signed_up-v3-bad.avsc")) .build()); final var calculator = SchemaChangeSetCalculators.builder().build(false, client); - final var schemas = calculator.calculate(existing, required); + // When: + final Collection schemas = calculator.calculate(existing, required, DOMAIN_ID); + + // Then: assertThat(schemas.iterator().next().state(), is(Status.STATE.FAILED)); assertThat( schemas.iterator().next().messages(), @@ -78,7 +90,112 @@ void shouldOutputMessagesOnBorkedSchema() throws Exception { assertThat(schemas.iterator().next().messages(), is(containsString("borked"))); } - private static Path filename(final String extra) { - return Path.of("./src/test/resources/schema/" + SCHEMA_FILENAME + extra); + @Test + void shouldDetectWhenSchemasHaveChanged() throws Exception { + // Given: + final ParsedSchema existingSchema = loadSchema(SCHEMA_BASE + "user_signed_up.avsc"); + + final int version = client.register(subject, existingSchema); + + final List existing = + List.of( + Schema.builder() + .type("AVRO") + .subject(subject) + .state(Status.STATE.READ) + .schema(existingSchema.copy(version)) + .build()); + + final List required = + List.of( + Schema.builder() + .type("AVRO") + .subject(subject) + .state(Status.STATE.READ) + .schema(loadSchema(SCHEMA_BASE + "user_signed_up-v2.avsc")) + .build()); + + final var calculator = SchemaChangeSetCalculators.builder().build(false, client); + + // When: + final Collection schemas = calculator.calculate(existing, required, DOMAIN_ID); + + // Then: + assertThat(schemas, hasSize(1)); + final Schema schema = schemas.iterator().next(); + assertThat(schema.state(), is(Status.STATE.UPDATE)); + assertThat(schema.subject(), is(subject)); + assertThat(schema.messages(), is("\n Update")); + } + + @Test + void shouldDetectWhenSchemasHaveNotChanged() throws Exception { + // Given: + final ParsedSchema existingSchema = loadSchema(SCHEMA_BASE + "user_signed_up.avsc"); + + final int version = client.register(subject, existingSchema); + + final List existing = + List.of( + Schema.builder() + .type("AVRO") + .subject(subject) + .state(Status.STATE.READ) + .schema(existingSchema.copy(version)) + .build()); + + final List required = + List.of( + Schema.builder() + .type("AVRO") + .subject(subject) + .state(Status.STATE.READ) + .schema(existingSchema) + .build()); + + final var calculator = SchemaChangeSetCalculators.builder().build(false, client); + + // When: + final Collection schemas = calculator.calculate(existing, required, DOMAIN_ID); + + // Then: + assertThat(schemas, is(empty())); + } + + @Test + void shouldIgnoreSchemasOutsideOfDomain() { + // Given: + final ParsedSchema requiredSchema = loadSchema("other.domain.Common.avsc"); + + final List existing = List.of(); + + final List required = + List.of( + Schema.builder() + .type("AVRO") + .subject(subject) + .state(Status.STATE.READ) + .schema(requiredSchema) + .build()); + + final var calculator = SchemaChangeSetCalculators.builder().build(false, client); + + // When: + final Collection schemas = calculator.calculate(existing, required, DOMAIN_ID); + + // Then: + assertThat(schemas, hasSize(1)); + final Schema schema = schemas.iterator().next(); + assertThat(schema.state(), is(Status.STATE.IGNORED)); + assertThat(schema.subject(), is(subject)); + assertThat(schema.messages(), is("\n ignored as it does not belong to the domain")); + } + + private static ParsedSchema loadSchema(final String fileName) { + return new SchemaReaders.FileSystemSchemaReader() + .readLocal(Path.of("./src/test/resources/schema/" + fileName)) + .iterator() + .next() + .schema(); } } diff --git a/kafka/src/test/java/io/specmesh/kafka/schema/SrSchemaManagerTest.java b/kafka/src/test/java/io/specmesh/kafka/schema/SrSchemaManagerTest.java index 30c7b68e..d8b9aa9b 100644 --- a/kafka/src/test/java/io/specmesh/kafka/schema/SrSchemaManagerTest.java +++ b/kafka/src/test/java/io/specmesh/kafka/schema/SrSchemaManagerTest.java @@ -42,8 +42,8 @@ import java.util.Map; import java.util.Objects; import org.junit.jupiter.api.Test; +import simple.schema_demo.UserSignedUp; import simple.schema_demo._public.user_checkout_value.UserCheckout; -import simple.schema_demo._public.user_signed_up_value.UserSignedUp; class SrSchemaManagerTest { diff --git a/kafka-test/src/test/java/simple/schema_demo/_public/user_signed_up_value/UserSignedUp.java b/kafka/src/test/java/simple/schema_demo/UserSignedUp.java similarity index 93% rename from kafka-test/src/test/java/simple/schema_demo/_public/user_signed_up_value/UserSignedUp.java rename to kafka/src/test/java/simple/schema_demo/UserSignedUp.java index ff73f1cb..8043a295 100644 --- a/kafka-test/src/test/java/simple/schema_demo/_public/user_signed_up_value/UserSignedUp.java +++ b/kafka/src/test/java/simple/schema_demo/UserSignedUp.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package simple.schema_demo._public.user_signed_up_value; +package simple.schema_demo; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/kafka/src/test/resources/schema-ref/com.example.single-spec-with-refs-api.yml b/kafka/src/test/resources/schema-ref/com.example.single-spec-with-refs-api.yml deleted file mode 100644 index a47bc1bc..00000000 --- a/kafka/src/test/resources/schema-ref/com.example.single-spec-with-refs-api.yml +++ /dev/null @@ -1,58 +0,0 @@ -asyncapi: '2.4.0' -id: 'urn:com.example.refs' -info: - title: Common Data Set - version: '1.0.0' - description: | - Contains both a TRADE and Currency - where Trade --> Currency, it will provision both - but should determine the ref and process the 0 refs first -servers: - mosquitto: - url: mqtt://test.mosquitto.org - protocol: kafka -channels: - _public.trade: - bindings: - kafka: - envs: - - staging - - prod - partitions: 3 - replicas: 1 - configs: - cleanup.policy: delete - retention.ms: 999000 - publish: - summary: Trade feed - description: Doing clever things - operationId: onTrade received - message: - bindings: - kafka: - schemaIdLocation: "header" - key: - type: string - - schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" - contentType: "application/octet-stream" - payload: - $ref: "/schema/com.example.trading.Trade.avsc" - _public.currency: - bindings: - kafka: - configs: - retention.ms: 999000 - publish: - summary: Currency things - operationId: onCurrencyUpdate - message: - schemaFormat: "application/vnd.apache.avro+json;version=1.9.0" - contentType: "application/octet-stream" - bindings: - kafka: - schemaIdLocation: "header" - schemaLookupStrategy: "RecordNameStrategy" - key: - type: string - payload: - $ref: "/schema/com.example.shared.Currency.avsc" - diff --git a/kafka/src/test/resources/schema-ref/schema/com.example.trading.Trade.avsc b/kafka/src/test/resources/schema-ref/schema/com.example.trading.Trade.avsc index 9dff5db2..e9289d3b 100644 --- a/kafka/src/test/resources/schema-ref/schema/com.example.trading.Trade.avsc +++ b/kafka/src/test/resources/schema-ref/schema/com.example.trading.Trade.avsc @@ -19,6 +19,11 @@ "type": "com.example.shared.Currency", "subject": "com.example.shared.Currency", "doc": "Currency is from another 'domain'." + }, + { + "name": "info", + "type": "TradeInfo", + "subject": "com.example.trading.TradeInfo" } ] } \ No newline at end of file diff --git a/kafka/src/test/resources/schema-ref/schema/com.example.trading.TradeInfo.avsc b/kafka/src/test/resources/schema-ref/schema/com.example.trading.TradeInfo.avsc new file mode 100644 index 00000000..d81f2877 --- /dev/null +++ b/kafka/src/test/resources/schema-ref/schema/com.example.trading.TradeInfo.avsc @@ -0,0 +1,8 @@ +{ + "type": "record", + "name": "TradeInfo", + "namespace": "com.example.trading", + "fields": [ + {"name": "id", "type": "string"} + ] +} \ No newline at end of file diff --git a/kafka/src/test/resources/schema/other.domain.Common.avsc b/kafka/src/test/resources/schema/other.domain.Common.avsc new file mode 100644 index 00000000..35ef04ff --- /dev/null +++ b/kafka/src/test/resources/schema/other.domain.Common.avsc @@ -0,0 +1,8 @@ +{ + "type": "record", + "namespace": "other.domain", + "name": "Common", + "fields": [ + {"name": "thing", "type": "string"} + ] +} \ No newline at end of file diff --git a/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up-v2.avsc b/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up-v2.avsc index 5884e1c9..734a0eb7 100644 --- a/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up-v2.avsc +++ b/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up-v2.avsc @@ -1,6 +1,6 @@ { "type": "record", - "namespace": "simple.provision_demo._public.user_signed_up_value", + "namespace": "simple.provision_demo", "name": "UserSignedUp", "fields": [ {"name": "fullName", "type": "string"}, diff --git a/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up-v3-bad.avsc b/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up-v3-bad.avsc index 431f45a5..c160297d 100644 --- a/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up-v3-bad.avsc +++ b/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up-v3-bad.avsc @@ -1,6 +1,6 @@ { "type": "record", - "namespace": "simple.provision_demo._public.user_signed_up_value", + "namespace": "simple.provision_demo", "name": "UserSignedUp", "fields": [ {"name": "borked", "type": "int"} diff --git a/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.avsc b/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.avsc index 07b4fc06..8cc5d2aa 100644 --- a/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.avsc +++ b/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.avsc @@ -1,6 +1,6 @@ { "type": "record", - "namespace": "simple.provision_demo._public.user_signed_up_value", + "namespace": "simple.provision_demo", "name": "UserSignedUp", "fields": [ {"name": "fullName", "type": "string"}, diff --git a/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.key.avsc b/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.key.avsc index ffdb8351..4aa6f329 100644 --- a/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.key.avsc +++ b/kafka/src/test/resources/schema/simple.provision_demo._public.user_signed_up.key.avsc @@ -1,6 +1,6 @@ { "type": "record", - "namespace": "simple.provision_demo._public.user_signed_up_value.key", + "namespace": "simple.provision_demo", "name": "UserSignedUpKey", "fields": [ {"name": "id", "type": "int"} diff --git a/kafka/src/test/resources/schema/simple.schema_demo._public.user_signed_up.avsc b/kafka/src/test/resources/schema/simple.schema_demo._public.user_signed_up.avsc index 73361bfc..9c3420c0 100644 --- a/kafka/src/test/resources/schema/simple.schema_demo._public.user_signed_up.avsc +++ b/kafka/src/test/resources/schema/simple.schema_demo._public.user_signed_up.avsc @@ -1,6 +1,6 @@ { "type": "record", - "namespace": "simple.schema_demo._public.user_signed_up_value", + "namespace": "simple.schema_demo", "name": "UserSignedUp", "fields": [ {"name": "fullName", "type": "string"},