Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[avro] Use SchemaBuilder API #1003

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ jobs:

- name: Test
env:
JAVA_OPTS: '-Davro.version=1.8.2'
JAVA_OPTS: '-Davro.version=1.9.2'
run: sbt '++ ${{ matrix.scala }}' avro/test

site:
Expand Down
84 changes: 40 additions & 44 deletions avro/src/main/scala/magnolify/avro/AvroType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import magnolify.shared.{doc => _, _}
import magnolify.shims.FactoryCompat
import org.apache.avro.generic.GenericData.EnumSymbol
import org.apache.avro.generic._
import org.apache.avro.{JsonProperties, LogicalType, LogicalTypes, Schema}
import org.apache.avro.{LogicalType, LogicalTypes, Schema, SchemaBuilder}
import org.joda.{time => joda}

import java.nio.{ByteBuffer, ByteOrder}
Expand Down Expand Up @@ -105,34 +105,26 @@ object AvroField {
}
} else {
new Record[T] {
override protected def buildSchema(cm: CaseMapper): Schema = Schema
.createRecord(
caseClass.typeName.short,
getDoc(caseClass.annotations, caseClass.typeName.full),
caseClass.typeName.owner,
false,
caseClass.parameters.map { p =>
new Schema.Field(
cm.map(p.label),
p.typeclass.schema(cm),
getDoc(p.annotations, s"${caseClass.typeName.full}#${p.label}"),
p.default
.map(d => p.typeclass.makeDefault(d)(cm))
.getOrElse(p.typeclass.fallbackDefault)
)
}.asJava
)

// `JacksonUtils.toJson` expects `Map[String, Any]` for `RECORD` defaults
override def makeDefault(d: T)(cm: CaseMapper): ju.Map[String, Any] = {
override protected def buildSchema(cm: CaseMapper): Schema = {
val builder = SchemaBuilder
.record(caseClass.typeName.short)
.namespace(caseClass.typeName.owner)
.doc(getDoc(caseClass.annotations, caseClass.typeName.full))
.fields()

caseClass.parameters
.map { p =>
val name = cm.map(p.label)
val value = p.typeclass.makeDefault(p.dereference(d))(cm)
name -> value
.foldLeft(builder) { (b, p) =>
val f = b
.name(cm.map(p.label))
.doc(getDoc(p.annotations, s"${caseClass.typeName.full}#${p.label}"))
.`type`(p.typeclass.schema(cm))

p.default match {
case Some(d) => f.withDefault(p.typeclass.makeDefault(d)(cm))
case None => f.noDefault()
}
}
.toMap
.asJava
.endRecord()
}

override def from(v: GenericRecord)(cm: CaseMapper): T =
Expand Down Expand Up @@ -198,9 +190,7 @@ object AvroField {
implicit val afFloat: AvroField[Float] = id[Float](Schema.Type.FLOAT)
implicit val afDouble: AvroField[Double] = id[Double](Schema.Type.DOUBLE)
implicit val afByteBuffer: AvroField[ByteBuffer] = new Aux[ByteBuffer, ByteBuffer, ByteBuffer] {
override protected def buildSchema(cm: CaseMapper): Schema = Schema.create(Schema.Type.BYTES)
// `JacksonUtils.toJson` expects `Array[Byte]` for `BYTES` defaults
override def makeDefault(d: ByteBuffer)(cm: CaseMapper): Array[Byte] = d.array()
override protected def buildSchema(cm: CaseMapper): Schema = SchemaBuilder.builder().bytesType()
// copy to avoid issue in case original buffer is reused
override def from(v: ByteBuffer)(cm: CaseMapper): ByteBuffer = {
val ptr = v.asReadOnlyBuffer()
Expand All @@ -214,7 +204,7 @@ object AvroField {
implicit val afCharSequence: AvroField[CharSequence] = id[CharSequence](Schema.Type.STRING)
implicit val afString: AvroField[String] = new Aux[String, String, String] {
override protected def buildSchema(cm: CaseMapper): Schema = {
val schema = Schema.create(Schema.Type.STRING)
val schema = SchemaBuilder.builder().stringType()
GenericData.setStringType(schema, GenericData.StringType.String)
schema
}
Expand All @@ -226,24 +216,24 @@ object AvroField {
// Avro 1.9+ added a type parameter for `GenericEnumSymbol`, breaking 1.8 compatibility
// Some reader, i.e. `AvroParquetReader` reads enums as `Utf8`
new Aux[T, AnyRef, EnumSymbol] {
override protected def buildSchema(cm: CaseMapper): Schema = {
val doc = getDoc(et.annotations, s"Enum ${et.namespace}.${et.name}")
Schema.createEnum(et.name, doc, et.namespace, et.values.asJava)
}
// `JacksonUtils.toJson` expects `String` for `ENUM` defaults
override def makeDefault(d: T)(cm: CaseMapper): String = et.to(d)
override protected def buildSchema(cm: CaseMapper): Schema =
SchemaBuilder
.enumeration(et.name)
.doc(getDoc(et.annotations, s"Enum ${et.namespace}.${et.name}"))
.namespace(et.namespace)
.symbols(et.values: _*)
override def from(v: FromT)(cm: CaseMapper): T = et.from(v.toString)
override def to(v: T)(cm: CaseMapper): ToT = new GenericData.EnumSymbol(schema(cm), v)
}

implicit def afOption[T](implicit f: AvroField[T]): AvroField[Option[T]] =
new Aux[Option[T], f.FromT, f.ToT] {
override protected def buildSchema(cm: CaseMapper): Schema =
Schema.createUnion(Schema.create(Schema.Type.NULL), f.schema(cm))
SchemaBuilder.unionOf().nullType.and().`type`(f.schema(cm)).endUnion()
// `Option[T]` is a `UNION` of `[NULL, T]` and must default to first type `NULL`
override def makeDefault(d: Option[T])(cm: CaseMapper): JsonProperties.Null = {
override def makeDefault(d: Option[T])(cm: CaseMapper): Null = {
require(d.isEmpty, "Option[T] can only default to None")
JsonProperties.NULL_VALUE
null
}
override def from(v: f.FromT)(cm: CaseMapper): Option[T] =
if (v == null) None else Some(f.from(v)(cm))
Expand All @@ -259,7 +249,8 @@ object AvroField {
fc: FactoryCompat[T, C[T]]
): AvroField[C[T]] =
new Aux[C[T], ju.List[f.FromT], GenericArray[f.ToT]] {
override protected def buildSchema(cm: CaseMapper): Schema = Schema.createArray(f.schema(cm))
override protected def buildSchema(cm: CaseMapper): Schema =
SchemaBuilder.array().items(f.schema(cm))
override def fallbackDefault: ju.List[f.ToT] = ju.Collections.emptyList()
override def from(v: ju.List[f.FromT])(cm: CaseMapper): C[T] =
fc.fromSpecific(v.asScala.iterator.map(p => f.from(p)(cm)))
Expand All @@ -269,7 +260,8 @@ object AvroField {

implicit def afCharSequenceMap[T](implicit f: AvroField[T]): AvroField[Map[CharSequence, T]] =
new Aux[Map[CharSequence, T], ju.Map[CharSequence, f.FromT], ju.Map[CharSequence, f.ToT]] {
override protected def buildSchema(cm: CaseMapper): Schema = Schema.createMap(f.schema(cm))
override protected def buildSchema(cm: CaseMapper): Schema =
SchemaBuilder.map().values(f.schema(cm))
override def fallbackDefault: ju.Map[CharSequence, f.ToT] = ju.Collections.emptyMap()
override def from(v: ju.Map[CharSequence, f.FromT])(cm: CaseMapper): Map[CharSequence, T] =
v.asScala.map { case (k, v) => k -> f.from(v)(cm) }.toMap
Expand All @@ -280,7 +272,7 @@ object AvroField {
implicit def afStringMap[T](implicit f: AvroField[T]): AvroField[Map[String, T]] =
new Aux[Map[String, T], ju.Map[String, f.FromT], ju.Map[String, f.ToT]] {
override protected def buildSchema(cm: CaseMapper): Schema = {
val schema = Schema.createMap(f.schema(cm))
val schema = SchemaBuilder.map().values(f.schema(cm))
GenericData.setStringType(schema, GenericData.StringType.String)
schema
}
Expand Down Expand Up @@ -361,7 +353,11 @@ object AvroField {
override protected def buildSchema(cm: CaseMapper): Schema = {
val n = ReflectionUtils.name[T]
val ns = ReflectionUtils.namespace[T]
Schema.createFixed(n, getDoc(an.annotations, n), ns, size)
SchemaBuilder
.fixed(n)
.namespace(ns)
.doc(getDoc(an.annotations, n))
.size(size)
}

override def from(v: GenericFixed)(cm: CaseMapper): T = f(v.bytes())
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ ThisBuild / githubWorkflowAddedJobs ++= Seq(
List(
WorkflowStep.Sbt(
List("avro/test"),
env = Map("JAVA_OPTS" -> "-Davro.version=1.8.2"),
env = Map("JAVA_OPTS" -> "-Davro.version=1.9.2"),
name = Some("Test")
)
),
Expand Down
Loading