Skip to content

Commit

Permalink
Better Avro schema reference handling.
Browse files Browse the repository at this point in the history
fixes: #406

- `AvroReferenceFinder` added to recursively look for Avro schema references.
- `SchemaReader` now returns an ordered list of schema that need to be registered, in the order they need to be registered
- `SchemaChangeSetCalculators` now ignores the version number when seeing if schema have changed (fixing a bug where it always thought the schema had changed, due to different version numbers).
- `SchemaChangeSetCalculators` now marks any schema outside the domain as `IGNORED`
- `SchemaMutators` includes `IGNORED` schemas, so that users can see that common schemas are not registered.
  • Loading branch information
Andy Coates committed Nov 5, 2024
1 parent ed2cc11 commit 6b3392c
Show file tree
Hide file tree
Showing 36 changed files with 1,408 additions and 473 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,23 @@
package io.specmesh.cli;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.specmesh.kafka.DockerKafkaEnvironment;
import io.specmesh.kafka.KafkaEnvironment;
import io.specmesh.kafka.provision.Status;
import io.specmesh.kafka.provision.TopicProvisioner.Topic;
import io.specmesh.kafka.provision.schema.SchemaProvisioner;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand All @@ -45,6 +54,8 @@ class ProvisionNestedFunctionalTest {
@Test
void shouldProvisionTopicsAndAclResourcesWithNestedSchemasAndRepublishCorrectly() {
// Given:
givenCommonSchemaRegistered();

final Provision provision = new Provision();

new CommandLine(provision)
Expand All @@ -67,13 +78,25 @@ void shouldProvisionTopicsAndAclResourcesWithNestedSchemasAndRepublishCorrectly(

assertThat(
status.topics().stream().map(Topic::name).collect(Collectors.toSet()),
is(
containsInAnyOrder(
"simple.schema_demo._public.super_user_signed_up",
"simple.schema_demo._public.user_signed_up")));
is(contains("simple.schema_demo._public.super_user_signed_up")));

assertThat(
status.schemas().stream()
.filter(s -> s.state() == Status.STATE.CREATED)
.map(SchemaProvisioner.Schema::subject)
.collect(Collectors.toList()),
containsInAnyOrder(
"simple.schema_demo._public.super_user_signed_up-value",
"simple.schema_demo._public.UserSignedUp"));

assertThat(
status.schemas().stream()
.filter(s -> s.state() == Status.STATE.IGNORED)
.map(SchemaProvisioner.Schema::subject)
.collect(Collectors.toList()),
contains("other.domain.Common.subject"));

assertThat(status.acls(), hasSize(10));
assertThat(status.schemas(), hasSize(2));

// When:
final var statusRepublish = provision.run();
Expand All @@ -82,10 +105,25 @@ void shouldProvisionTopicsAndAclResourcesWithNestedSchemasAndRepublishCorrectly(
assertThat(statusRepublish.failed(), is(false));
assertThat(statusRepublish.topics(), is(empty()));
assertThat(statusRepublish.acls(), is(empty()));
assertThat(
"should be empty, but is 1 due to version number difference when comparing"
+ " references",
statusRepublish.schemas(),
hasSize(1));

final List<?> schemas =
statusRepublish.schemas().stream()
.filter(s -> s.state() != Status.STATE.IGNORED)
.collect(Collectors.toList());

assertThat(schemas, is(empty()));
}

private void givenCommonSchemaRegistered() {
try (SchemaRegistryClient srClient = KAFKA_ENV.srClient()) {
final ParsedSchema schema =
new AvroSchema(
Files.readString(
Path.of(
"./src/test/resources/schema/other.domain.Common.avsc")));
srClient.register("other.domain.Common.subject", schema);
} catch (Exception e) {
throw new AssertionError("failed to register common schema", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import picocli.CommandLine;
import simple.schema_demo._public.UserSignedUp;
import simple.schema_demo.UserSignedUp;

@SuppressFBWarnings({"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"})
class StorageConsumptionFunctionalTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package simple.schema_demo._public;
package simple.schema_demo;

import lombok.AllArgsConstructor;
import lombok.Data;
Expand Down
29 changes: 1 addition & 28 deletions cli/src/test/resources/nested_schema_demo-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,6 @@ servers:
url: mqtt://test.mosquitto.org
protocol: mqtt
channels:
_public.user_signed_up:
bindings:
kafka:
envs:
- staging
- prod
partitions: 3
replicas: 1
configs:
cleanup.policy: delete

publish:
operationId: onLightMeasured
message:
bindings:
kafka:
key:
type: long
schemaIdLocation: "payload"
schemaLookupStrategy: "RecordNameStrategy"
schemaFormat: "application/vnd.apache.avro+json;version=1.9.0"
contentType: "application/octet-stream"
payload:
$ref: "/schema/simple.schema_demo._public.UserSignedUp.avsc"

_public.super_user_signed_up:
# publish bindings to instruct topic configuration per environment
bindings:
Expand All @@ -58,9 +33,7 @@ channels:
kafka:
key:
type: long
schemaIdLocation: "payload"
schemaLookupStrategy: "RecordNameStrategy"
schemaFormat: "application/vnd.apache.avro+json;version=1.9.0"
contentType: "application/octet-stream"
payload:
$ref: "/schema/simple.schema_demo._public.SuperUserSignedUp.avsc"
$ref: "/schema/simple.schema_demo.SuperUserSignedUp.avsc"
8 changes: 8 additions & 0 deletions cli/src/test/resources/schema/other.domain.Common.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"type": "record",
"namespace": "other.domain",
"name": "Common",
"fields": [
{"name": "thing", "type": "string"}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"type": "record",
"namespace": "simple.schema_demo",
"name": "SuperUserSignedUp",
"fields": [
{"name": "role", "type": "string"},
{"name": "user", "type": "simple.schema_demo.UserSignedUp", "subject": "simple.schema_demo._public.UserSignedUp"},
{"name": "common", "type": "other.domain.Common", "subject": "other.domain.Common.subject"}
]
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": "record",
"namespace": "simple.schema_demo._public",
"namespace": "simple.schema_demo",
"name": "UserSignedUp",
"fields": [
{"name": "fullName", "type": "string"},
Expand Down

This file was deleted.

2 changes: 1 addition & 1 deletion cli/src/test/resources/simple_schema_demo-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ channels:
schemaFormat: "application/vnd.apache.avro+json;version=1.9.0"
contentType: "application/octet-stream"
payload:
$ref: "/schema/simple.schema_demo._public.UserSignedUp.avsc"
$ref: "/schema/simple.schema_demo.UserSignedUp.avsc"

# PRODUCER/OWNER build pipe will publish schema to SR
_public.user_checkout:
Expand Down
2 changes: 1 addition & 1 deletion cli/src/test/resources/simple_spec_demo-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ channels:
schemaFormat: "application/vnd.apache.avro+json;version=1.9.0"
contentType: "application/octet-stream"
payload:
$ref: "/schema/simple.schema_demo._public.UserSignedUp.avsc"
$ref: "/schema/simple.schema_demo.UserSignedUp.avsc"

_private/user_checkout:
bindings:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import simple.schema_demo._public.user_signed_up_value.UserSignedUp;
import simple.schema_demo.UserSignedUp;

class ClientsFunctionalDemoTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package simple.schema_demo._public.user_signed_up_value;
package simple.schema_demo;

import lombok.AllArgsConstructor;
import lombok.Data;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": "record",
"namespace": "simple.schema_demo._public.user_signed_up_value",
"namespace": "simple.schema_demo",
"name": "UserSignedUp",
"fields": [
{"name": "fullName", "type": "string"},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": "record",
"namespace": "simple.schema_demo._public.user_signed_up_value",
"namespace": "simple.schema_demo",
"name": "UserSignedUp",
"fields": [
{"name": "fullName", "type": "string"},
Expand Down
Loading

0 comments on commit 6b3392c

Please sign in to comment.