Skip to content

Commit

Permalink
Add compatiblity assertions with clues (#990)
Browse files Browse the repository at this point in the history
* Add compatiblity assertions with clues

* Use internal converters

* Use new trait to avoid breaking bincompat

* format

* Update docs to use assertion
  • Loading branch information
keirlawson authored Oct 21, 2023
1 parent c418343 commit 9572ca0
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 8 deletions.
4 changes: 1 addition & 3 deletions docs/src/main/mdoc/modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,7 @@ class MySpec extends SchemaSuite {
test("my codec is compatible") {
val myCodec: Codec[String] = ???

val compatibility = checker().checkReaderCompatibility(myCodec, "my-schema-subject").unsafeRunSync()

assertEquals(compatibility.getType(), SchemaCompatibilityType.COMPATIBLE, compatibility.getResult().getIncompatibilities())
val compatibility = checker().assertReaderCompatibility(myCodec, "my-schema-subject").unsafeRunSync()
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchema
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import org.apache.avro.Schema
import org.apache.avro.SchemaCompatibility.Incompatibility
import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType
import fs2.kafka.internal.syntax._

trait CompatibilityChecker[F[_]] {
def checkReaderCompatibility[A](
Expand All @@ -27,16 +30,34 @@ trait CompatibilityChecker[F[_]] {
): F[SchemaCompatibility.SchemaPairCompatibility]
}

trait AssertableCompatibilityChecker[F[_]] extends CompatibilityChecker[F] {
def assertReaderCompatibility[A](reader: Codec[A], writerSubject: String): F[Unit]

def assertWriterCompatibility[A](writer: Codec[A], readerSubject: String): F[Unit]
}

trait SchemaSuite extends FunSuite {
private def codecAsSchema[A](codec: Codec[A]) = codec.schema.fold(e => fail(e.message), ok => ok)

private def renderIncompatibilities(incompatibilities: List[Incompatibility]): String =
"Schema incompatibilities:\n" + incompatibilities.zipWithIndex
.map({
case (incompatibility, i) =>
s"""${i + 1}) ${incompatibility.getType} - ${incompatibility.getMessage}
|At ${incompatibility.getLocation}
|Reader schema fragment: ${incompatibility.getReaderFragment.toString(true)}
|Writer schema fragment: ${incompatibility.getWriterFragment
.toString(true)}""".stripMargin
})
.mkString("\n-----\n")

def compatibilityChecker(
clientSettings: SchemaRegistryClientSettings[IO],
name: String = "schema-compatibility-checker"
) = new Fixture[CompatibilityChecker[IO]](name) {
private var checker: CompatibilityChecker[IO] = null
) = new Fixture[AssertableCompatibilityChecker[IO]](name) {
private var checker: AssertableCompatibilityChecker[IO] = null

override def apply(): CompatibilityChecker[IO] = checker
override def apply(): AssertableCompatibilityChecker[IO] = checker

override def beforeAll(): Unit =
checker = newCompatibilityChecker(clientSettings)
Expand All @@ -45,10 +66,10 @@ trait SchemaSuite extends FunSuite {

def newCompatibilityChecker(
clientSettings: SchemaRegistryClientSettings[IO]
): IO[CompatibilityChecker[IO]] =
): IO[AssertableCompatibilityChecker[IO]] =
clientSettings.createSchemaRegistryClient
.map { client =>
new CompatibilityChecker[IO] {
new AssertableCompatibilityChecker[IO] {
private def registrySchema(subject: String): IO[Schema] =
for {
metadata <- IO.delay(client.getLatestSchemaMetadata(subject))
Expand Down Expand Up @@ -82,6 +103,34 @@ trait SchemaSuite extends FunSuite {
)
}
}

def assertWriterCompatibility[A](
writer: Codec[A],
readerSubject: String
): IO[Unit] =
checkReaderCompatibility(writer, readerSubject).flatMap { compat =>
IO.delay {
assertEquals(
compat.getResult().getCompatibility(),
SchemaCompatibilityType.COMPATIBLE,
renderIncompatibilities(compat.getResult.getIncompatibilities.toList)
)
}
}

def assertReaderCompatibility[A](
reader: Codec[A],
writerSubject: String
): IO[Unit] =
checkReaderCompatibility(reader, writerSubject).flatMap { compat =>
IO.delay {
assertEquals(
compat.getResult().getCompatibility(),
SchemaCompatibilityType.COMPATIBLE,
renderIncompatibilities(compat.getResult.getIncompatibilities.toList)
)
}
}
}
}
}

0 comments on commit 9572ca0

Please sign in to comment.