From 61e5581de749746dbe1f63436465a7de3ffcca74 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 9 Jul 2024 09:30:25 -0700 Subject: [PATCH 01/12] Derive AvroCompat automatically on read --- .../magnolify/parquet/ParquetField.scala | 61 +++++++++++-------- .../scala/magnolify/parquet/ParquetType.scala | 27 ++++++-- .../scala/magnolify/parquet/Predicate.scala | 2 +- .../main/scala/magnolify/parquet/Schema.scala | 15 +++++ 4 files changed, 71 insertions(+), 34 deletions(-) diff --git a/parquet/src/main/scala/magnolify/parquet/ParquetField.scala b/parquet/src/main/scala/magnolify/parquet/ParquetField.scala index 2f2cc902e..2458adca4 100644 --- a/parquet/src/main/scala/magnolify/parquet/ParquetField.scala +++ b/parquet/src/main/scala/magnolify/parquet/ParquetField.scala @@ -51,7 +51,7 @@ sealed trait ParquetField[T] extends Serializable { protected final def nonEmpty(v: T): Boolean = !isEmpty(v) def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit - def newConverter: TypeConverter[T] + def newConverter(writerSchema: Type): TypeConverter[T] protected def writeGroup(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = { if (isGroup) { @@ -83,8 +83,9 @@ object ParquetField { override protected def isEmpty(v: T): Boolean = tc.isEmpty(p.dereference(v)) override def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = tc.writeGroup(c, p.dereference(v))(cm) - override def newConverter: TypeConverter[T] = { - val buffered = tc.newConverter + override def newConverter(writerSchema: Type): TypeConverter[T] = { + val buffered = tc + .newConverter(writerSchema) .asInstanceOf[TypeConverter.Buffered[p.PType]] new TypeConverter.Delegate[p.PType, T](buffered) { override def get: T = inner.get(b => caseClass.construct(_ => b.head)) @@ -139,9 +140,10 @@ object ParquetField { } } - override def newConverter: TypeConverter[T] = + override def newConverter(writerSchema: Type): TypeConverter[T] = new GroupConverter with TypeConverter.Buffered[T] { - private val fieldConverters = caseClass.parameters.map(_.typeclass.newConverter) + private val fieldConverters = + caseClass.parameters.map(_.typeclass.newConverter(writerSchema)) override def isPrimitive: Boolean = false @@ -191,8 +193,9 @@ object ParquetField { new Primitive[U] { override def buildSchema(cm: CaseMapper): Type = pf.schema(cm) override def write(c: RecordConsumer, v: U)(cm: CaseMapper): Unit = pf.write(c, g(v))(cm) - override def newConverter: TypeConverter[U] = - pf.newConverter.asInstanceOf[TypeConverter.Primitive[T]].map(f) + override def newConverter(writerSchema: Type): TypeConverter[U] = + pf.newConverter(writerSchema).asInstanceOf[TypeConverter.Primitive[T]].map(f) + override type ParquetT = pf.ParquetT } } @@ -215,7 +218,7 @@ object ParquetField { new Primitive[T] { override def buildSchema(cm: CaseMapper): Type = Schema.primitive(ptn, lta) override def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = f(c)(v) - override def newConverter: TypeConverter[T] = g + override def newConverter(writerSchema: Type): TypeConverter[T] = g override type ParquetT = UnderlyingT } @@ -291,8 +294,9 @@ object ParquetField { override def write(c: RecordConsumer, v: Option[T])(cm: CaseMapper): Unit = v.foreach(t.writeGroup(c, _)(cm)) - override def newConverter: TypeConverter[Option[T]] = { - val buffered = t.newConverter + override def newConverter(writerSchema: Type): TypeConverter[Option[T]] = { + val buffered = t + .newConverter(writerSchema) .asInstanceOf[TypeConverter.Buffered[T]] .withRepetition(Repetition.OPTIONAL) new TypeConverter.Delegate[T, Option[T]](buffered) { @@ -339,15 +343,16 @@ object ParquetField { v.foreach(t.writeGroup(c, _)(cm)) } - override def newConverter: TypeConverter[C[T]] = { - val buffered = t.newConverter + override def newConverter(writerSchema: Type): TypeConverter[C[T]] = { + val buffered = t + .newConverter(writerSchema) .asInstanceOf[TypeConverter.Buffered[T]] .withRepetition(Repetition.REPEATED) val arrayConverter = new TypeConverter.Delegate[T, C[T]](buffered) { override def get: C[T] = inner.get(fc.fromSpecific) } - if (hasAvroArray) { + if (Schema.hasGroupedArray(writerSchema)) { new GroupConverter with TypeConverter.Buffered[C[T]] { override def getConverter(fieldIndex: Int): Converter = { require(fieldIndex == 0, "Avro array field index != 0") @@ -421,10 +426,10 @@ object ParquetField { } } - override def newConverter: TypeConverter[Map[K, V]] = { + override def newConverter(writerSchema: Type): TypeConverter[Map[K, V]] = { val kvConverter = new GroupConverter with TypeConverter.Buffered[(K, V)] { - private val keyConverter = pfKey.newConverter - private val valueConverter = pfValue.newConverter + private val keyConverter = pfKey.newConverter(writerSchema) + private val valueConverter = pfValue.newConverter(writerSchema) private val fieldConverters = Array(keyConverter, valueConverter) override def isPrimitive: Boolean = false @@ -466,8 +471,8 @@ object ParquetField { def apply[U](f: T => U)(g: U => T)(implicit pf: Primitive[T]): Primitive[U] = new Primitive[U] { override def buildSchema(cm: CaseMapper): Type = Schema.setLogicalType(pf.schema(cm), lta) override def write(c: RecordConsumer, v: U)(cm: CaseMapper): Unit = pf.write(c, g(v))(cm) - override def newConverter: TypeConverter[U] = - pf.newConverter.asInstanceOf[TypeConverter.Primitive[T]].map(f) + override def newConverter(writerSchema: Type): TypeConverter[U] = + pf.newConverter(writerSchema).asInstanceOf[TypeConverter.Primitive[T]].map(f) override type ParquetT = pf.ParquetT } @@ -509,9 +514,10 @@ object ParquetField { override def write(c: RecordConsumer, v: BigDecimal)(cm: CaseMapper): Unit = c.addBinary(Binary.fromConstantByteArray(Decimal.toFixed(v, precision, scale, length))) - override def newConverter: TypeConverter[BigDecimal] = TypeConverter.newByteArray.map { ba => - Decimal.fromBytes(ba, precision, scale) - } + override def newConverter(writerSchema: Type): TypeConverter[BigDecimal] = + TypeConverter.newByteArray.map { ba => + Decimal.fromBytes(ba, precision, scale) + } override type ParquetT = Binary } @@ -544,12 +550,13 @@ object ParquetField { ) ) - override def newConverter: TypeConverter[UUID] = TypeConverter.newByteArray.map { ba => - val bb = ByteBuffer.wrap(ba) - val h = bb.getLong - val l = bb.getLong - new UUID(h, l) - } + override def newConverter(writerSchema: Type): TypeConverter[UUID] = + TypeConverter.newByteArray.map { ba => + val bb = ByteBuffer.wrap(ba) + val h = bb.getLong + val l = bb.getLong + new UUID(h, l) + } override type ParquetT = Binary } diff --git a/parquet/src/main/scala/magnolify/parquet/ParquetType.scala b/parquet/src/main/scala/magnolify/parquet/ParquetType.scala index 6003bfaf8..829bfcd58 100644 --- a/parquet/src/main/scala/magnolify/parquet/ParquetType.scala +++ b/parquet/src/main/scala/magnolify/parquet/ParquetType.scala @@ -30,7 +30,7 @@ import org.apache.parquet.hadoop.{ } import org.apache.parquet.io.api._ import org.apache.parquet.io.{InputFile, OutputFile} -import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.{MessageType, Type} import org.slf4j.LoggerFactory import org.typelevel.scalaccompat.annotation.nowarn @@ -73,7 +73,7 @@ sealed trait ParquetType[T] extends Serializable { def writeBuilder(file: OutputFile): WriteBuilder[T] = new WriteBuilder(file, writeSupport) def write(c: RecordConsumer, v: T): Unit = () - def newConverter: TypeConverter[T] = null + def newConverter(writerSchema: Type): TypeConverter[T] = null } object ParquetType { @@ -97,8 +97,10 @@ object ParquetType { override val avroCompat: Boolean = pa == ParquetArray.AvroCompat.avroCompat || f.hasAvroArray + override def write(c: RecordConsumer, v: T): Unit = r.write(c, v)(cm) - override def newConverter: TypeConverter[T] = r.newConverter + override def newConverter(writerSchema: Type): TypeConverter[T] = + r.newConverter(writerSchema) } case _ => throw new IllegalArgumentException(s"ParquetType can only be created from Record. Got $f") @@ -151,9 +153,22 @@ object ParquetType { ) } - val requestedSchema = Schema.message(parquetType.schema) + val requestedSchema = { + val s = Schema.message(parquetType.schema) + // If reading Avro, roundtrip schema using parquet-avro converter to ensure array compatibility; + // magnolify-parquet does not automatically wrap repeated fields into a group like parquet-avro does + if (isAvroFile) { + val converter = new AvroSchemaConverter() + converter.convert(converter.convert(s)) + } else { + s + } + } Schema.checkCompatibility(context.getFileSchema, requestedSchema) - new hadoop.ReadSupport.ReadContext(requestedSchema, java.util.Collections.emptyMap()) + new hadoop.ReadSupport.ReadContext( + requestedSchema, + java.util.Collections.emptyMap() + ) } override def prepareForRead( @@ -163,7 +178,7 @@ object ParquetType { readContext: hadoop.ReadSupport.ReadContext ): RecordMaterializer[T] = new RecordMaterializer[T] { - private val root = parquetType.newConverter + private val root = parquetType.newConverter(fileSchema) override def getCurrentRecord: T = root.get override def getRootConverter: GroupConverter = root.asGroupConverter() } diff --git a/parquet/src/main/scala/magnolify/parquet/Predicate.scala b/parquet/src/main/scala/magnolify/parquet/Predicate.scala index 83d9dd4a3..a1e0ce927 100644 --- a/parquet/src/main/scala/magnolify/parquet/Predicate.scala +++ b/parquet/src/main/scala/magnolify/parquet/Predicate.scala @@ -65,7 +65,7 @@ object Predicate { } def wrap[T](addFn: (PrimitiveConverter, T) => Unit): T => ScalaFieldT = { - lazy val converter = pf.newConverter + lazy val converter = pf.newConverter(pf.schema(CaseMapper.identity)) value => { addFn(converter.asPrimitiveConverter(), value) converter.get diff --git a/parquet/src/main/scala/magnolify/parquet/Schema.scala b/parquet/src/main/scala/magnolify/parquet/Schema.scala index e978b22d6..e58fae85b 100644 --- a/parquet/src/main/scala/magnolify/parquet/Schema.scala +++ b/parquet/src/main/scala/magnolify/parquet/Schema.scala @@ -95,6 +95,21 @@ private object Schema { builder.named(schema.getName) } + // Check if writer schema encodes arrays as a single repeated field inside of an optional or required group + private[parquet] def hasGroupedArray(writer: Type): Boolean = + !writer.isPrimitive && writer.asGroupType().getFields.asScala.exists { + case f if isGroupedArrayType(f) => true + case f if !f.isPrimitive => f.asGroupType().getFields.asScala.exists(hasGroupedArray) + case _ => false + } + + private def isGroupedArrayType(f: Type): Boolean = + !f.isPrimitive && + f.getLogicalTypeAnnotation == LogicalTypeAnnotation.listType() && { + val fields = f.asGroupType().getFields.asScala + fields.size == 1 && fields.head.isRepetition(Repetition.REPEATED) + } + def checkCompatibility(writer: Type, reader: Type): Unit = { def listFields(gt: GroupType) = s"[${gt.getFields.asScala.map(f => s"${f.getName}: ${f.getRepetition}").mkString(",")}]" From a1ee14945959b646aabbfdf4cd4a1c571ab5e766 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 9 Jul 2024 10:03:07 -0700 Subject: [PATCH 02/12] Add test --- .../magnolify/parquet/ParquetTypeSuite.scala | 108 ++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala b/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala index dbbb7bb8f..275e406f3 100644 --- a/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala +++ b/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala @@ -27,8 +27,13 @@ import magnolify.shared.doc import magnolify.shared.TestEnumType._ import magnolify.test.Simple._ import magnolify.test._ +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.io._ +import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.Type.Repetition +import org.apache.parquet.schema.Types import org.scalacheck._ import java.io.ByteArrayInputStream @@ -192,6 +197,107 @@ class ParquetTypeSuite extends MagnolifySuite { assertEquals(inner.getFields.asScala.map(_.getName).toSeq, Seq("INNERFIRST")) } } + + test(s"AvroCompat") { + val pt = ParquetType[WithList] + // Assert that by default, Magnolify doesn't wrap repeated fields in group types + assert(!Schema.hasGroupedArray(pt.schema)) + assertEquals( + """|message magnolify.parquet.WithList { + | required binary s (STRING); + | repeated binary l (STRING); + |} + |""".stripMargin, + pt.schema.toString + ) + + // Construct Magnolify read support for a Parquet file written using parquet-avro + val readSupport = pt.readSupport.init( + new InitContext( + new Configuration(), + Map(ParquetWriter.OBJECT_MODEL_NAME_PROP -> Set("avro").asJava).asJava, + // Build expected writer schema, which uses Avro array grouping + Types + .buildMessage() + .addField(Types.primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED).named("s")) + .addField( + Types + .buildGroup(Repetition.REQUIRED) + .addField( + Types + .primitive(PrimitiveTypeName.BINARY, Repetition.REPEATED) + .named("array") + ) + .named("list") + ) + .named("WithList") + ) + ) + + // Assert that Magnolify is using a compatible array schema to read + assert(Schema.hasGroupedArray(readSupport.getRequestedSchema)) + assertEquals( + """|message magnolify.parquet.WithList { + | required binary s (STRING); + | required group l (LIST) { + | repeated binary array (STRING); + | } + |} + |""".stripMargin, + readSupport.getRequestedSchema.toString + ) + } + + test(s"AvroCompat") { + val pt = ParquetType[WithList] + + // Assert that by default, Magnolify doesn't wrap repeated fields in group types + assert(!Schema.hasGroupedArray(pt.schema)) + assertEquals( + """|message magnolify.parquet.WithList { + | required binary s (STRING); + | repeated binary l (STRING); + |} + |""".stripMargin, + pt.schema.toString + ) + + // Construct Magnolify read support for a Parquet file written using parquet-avro + val readSupport = pt.readSupport.init( + new InitContext( + new Configuration(), + Map(ParquetWriter.OBJECT_MODEL_NAME_PROP -> Set("avro").asJava).asJava, + // Build expected writer schema, which uses parquet-avro-style array grouping + Types + .buildMessage() + .addField(Types.primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED).named("s")) + .addField( + Types + .buildGroup(Repetition.REQUIRED) + .addField( + Types + .primitive(PrimitiveTypeName.BINARY, Repetition.REPEATED) + .named("array") + ) + .named("list") + ) + .named("WithList") + ) + ) + + // Assert that Magnolify is using a compatible array schema to read + assert(Schema.hasGroupedArray(readSupport.getRequestedSchema)) + assertEquals( + """|message magnolify.parquet.WithList { + | required binary s (STRING); + | required group l (LIST) { + | repeated binary array (STRING); + | } + |} + |""".stripMargin, + readSupport.getRequestedSchema.toString + ) + } } case class Unsafe(c: Char) @@ -225,6 +331,8 @@ case class ParquetNestedListDoc( i: List[Integers] ) +case class WithList(s: String, l: List[String]) + class TestInputFile(ba: Array[Byte]) extends InputFile { private val bais = new ByteArrayInputStream(ba) override def getLength: Long = ba.length.toLong From 81e3f0711473ff5352ee5d6c13c35c578ca5ecc6 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 12 Sep 2024 11:01:43 -0400 Subject: [PATCH 03/12] [Very WIP] Make writes Configurable --- .../parquet/MagnolifyParquetProperties.scala | 39 +++++ .../magnolify/parquet/ParquetField.scala | 134 +++++++++++------- .../scala/magnolify/parquet/ParquetType.scala | 107 +++++++------- .../scala/magnolify/parquet/Predicate.scala | 6 +- .../main/scala/magnolify/parquet/Schema.scala | 4 +- .../magnolify/parquet/AvroParquetSuite.scala | 8 +- .../magnolify/parquet/ParquetTypeSuite.scala | 130 ++++++----------- .../parquet/SchemaEvolutionSuite.scala | 1 + 8 files changed, 229 insertions(+), 200 deletions(-) create mode 100644 parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala diff --git a/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala b/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala new file mode 100644 index 000000000..e3c1824e2 --- /dev/null +++ b/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.parquet + +import org.apache.hadoop.conf.Configuration + +import java.util.Objects + +object MagnolifyParquetProperties { + val WriteGroupedArrays: String = "magnolify.parquet.write-grouped-arrays" + val WriteGroupedArraysDefault: Boolean = false + + val WriteAvroSchemaToMetadata: String = "magnolify.parquet.write-avro-schema" + val WriteAvroSchemaToMetadataDefault: Boolean = true + + val ReadTypeKey = "parquet.type.read.type" + val WriteTypeKey = "parquet.type.write.type" + + // Hash any Configuration values that might affect schema creation to use as part of Schema cache key + private[parquet] def hashValues(conf: Configuration): Int = { + Objects.hash( + Option(conf.get(WriteGroupedArrays)).map(_.toBoolean).getOrElse(WriteGroupedArraysDefault) + ) + } +} diff --git a/parquet/src/main/scala/magnolify/parquet/ParquetField.scala b/parquet/src/main/scala/magnolify/parquet/ParquetField.scala index 2458adca4..e9585bd8e 100644 --- a/parquet/src/main/scala/magnolify/parquet/ParquetField.scala +++ b/parquet/src/main/scala/magnolify/parquet/ParquetField.scala @@ -23,42 +23,54 @@ import magnolify.shims.FactoryCompat import java.nio.{ByteBuffer, ByteOrder} import java.time.LocalDate import java.util.UUID +import org.apache.hadoop.conf.Configuration import org.apache.parquet.io.ParquetDecodingException import org.apache.parquet.io.api._ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema.{LogicalTypeAnnotation, Type, Types} +import org.typelevel.scalaccompat.annotation.nowarn import scala.annotation.implicitNotFound import scala.collection.concurrent -import scala.collection.compat._ sealed trait ParquetField[T] extends Serializable { - @transient private lazy val schemaCache: concurrent.Map[UUID, Type] = + @transient private lazy val schemaCache: concurrent.Map[Int, concurrent.Map[UUID, Type]] = concurrent.TrieMap.empty - protected def buildSchema(cm: CaseMapper): Type - def schema(cm: CaseMapper): Type = - schemaCache.getOrElseUpdate(cm.uuid, buildSchema(cm)) + protected def buildSchema(cm: CaseMapper, conf: Configuration): Type + + def schema(cm: CaseMapper, conf: Configuration): Type = { + val confHash = MagnolifyParquetProperties.hashValues(conf) + + if (!schemaCache.contains(confHash)) { + schemaCache.put(confHash, concurrent.TrieMap.empty) + } + + schemaCache(confHash).getOrElseUpdate( + cm.uuid, + buildSchema(cm, Option(conf).getOrElse(new Configuration())) + ) + } - val hasAvroArray: Boolean = false def fieldDocs(cm: CaseMapper): Map[String, String] def typeDoc: Option[String] - protected val isGroup: Boolean = false + protected def isGroup(conf: Configuration): Boolean = false protected def isEmpty(v: T): Boolean protected final def nonEmpty(v: T): Boolean = !isEmpty(v) - def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit + def write(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit def newConverter(writerSchema: Type): TypeConverter[T] - protected def writeGroup(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = { - if (isGroup) { + protected def writeGroup(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit = { + val wrapGroup = isGroup(conf) + if (wrapGroup) { c.startGroup() } - write(c, v)(cm) - if (isGroup) { + write(c, v, conf)(cm) + if (wrapGroup) { c.endGroup() } } @@ -66,7 +78,7 @@ sealed trait ParquetField[T] extends Serializable { object ParquetField { sealed trait Record[T] extends ParquetField[T] { - override protected val isGroup: Boolean = true + override protected def isGroup(conf: Configuration): Boolean = true override protected def isEmpty(v: T): Boolean = false } @@ -79,10 +91,11 @@ object ParquetField { val p = caseClass.parameters.head val tc = p.typeclass new ParquetField[T] { - override protected def buildSchema(cm: CaseMapper): Type = tc.buildSchema(cm) + override protected def buildSchema(cm: CaseMapper, conf: Configuration): Type = + tc.buildSchema(cm, conf) override protected def isEmpty(v: T): Boolean = tc.isEmpty(p.dereference(v)) - override def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = - tc.writeGroup(c, p.dereference(v))(cm) + override def write(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit = + tc.writeGroup(c, p.dereference(v), conf)(cm) override def newConverter(writerSchema: Type): TypeConverter[T] = { val buffered = tc .newConverter(writerSchema) @@ -96,15 +109,13 @@ object ParquetField { } } else { new Record[T] { - override def buildSchema(cm: CaseMapper): Type = + override def buildSchema(cm: CaseMapper, conf: Configuration): Type = caseClass.parameters .foldLeft(Types.requiredGroup()) { (g, p) => - g.addField(Schema.rename(p.typeclass.schema(cm), cm.map(p.label))) + g.addField(Schema.rename(p.typeclass.schema(cm, conf), cm.map(p.label))) } .named(caseClass.typeName.full) - override val hasAvroArray: Boolean = caseClass.parameters.exists(_.typeclass.hasAvroArray) - override def fieldDocs(cm: CaseMapper): Map[String, String] = caseClass.parameters.flatMap { param => val label = cm.map(param.label) @@ -128,13 +139,13 @@ object ParquetField { s"Type ${caseClass.typeName}" ) - override def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = { + override def write(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit = { caseClass.parameters.foreach { p => val x = p.dereference(v) if (p.typeclass.nonEmpty(x)) { val name = cm.map(p.label) c.startField(name, p.index) - p.typeclass.writeGroup(c, x)(cm) + p.typeclass.writeGroup(c, x, conf)(cm) c.endField(name, p.index) } } @@ -191,8 +202,9 @@ object ParquetField { class FromWord[T] { def apply[U](f: T => U)(g: U => T)(implicit pf: Primitive[T]): Primitive[U] = new Primitive[U] { - override def buildSchema(cm: CaseMapper): Type = pf.schema(cm) - override def write(c: RecordConsumer, v: U)(cm: CaseMapper): Unit = pf.write(c, g(v))(cm) + override def buildSchema(cm: CaseMapper, conf: Configuration): Type = pf.schema(cm, conf) + override def write(c: RecordConsumer, v: U, conf: Configuration)(cm: CaseMapper): Unit = + pf.write(c, g(v), conf)(cm) override def newConverter(writerSchema: Type): TypeConverter[U] = pf.newConverter(writerSchema).asInstanceOf[TypeConverter.Primitive[T]].map(f) @@ -216,8 +228,10 @@ object ParquetField { lta: => LogicalTypeAnnotation = null ): Primitive[T] = new Primitive[T] { - override def buildSchema(cm: CaseMapper): Type = Schema.primitive(ptn, lta) - override def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = f(c)(v) + override def buildSchema(cm: CaseMapper, conf: Configuration): Type = + Schema.primitive(ptn, lta) + override def write(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit = + f(c)(v) override def newConverter(writerSchema: Type): TypeConverter[T] = g override type ParquetT = UnderlyingT } @@ -283,16 +297,18 @@ object ParquetField { implicit def pfOption[T](implicit t: ParquetField[T]): ParquetField[Option[T]] = new ParquetField[Option[T]] { - override def buildSchema(cm: CaseMapper): Type = - Schema.setRepetition(t.schema(cm), Repetition.OPTIONAL) + override def buildSchema(cm: CaseMapper, conf: Configuration): Type = + Schema.setRepetition(t.schema(cm, conf), Repetition.OPTIONAL) override protected def isEmpty(v: Option[T]): Boolean = v.forall(t.isEmpty) override def fieldDocs(cm: CaseMapper): Map[String, String] = t.fieldDocs(cm) override val typeDoc: Option[String] = None - override def write(c: RecordConsumer, v: Option[T])(cm: CaseMapper): Unit = - v.foreach(t.writeGroup(c, _)(cm)) + override def write(c: RecordConsumer, v: Option[T], conf: Configuration)( + cm: CaseMapper + ): Unit = + v.foreach(t.writeGroup(c, _, conf)(cm)) override def newConverter(writerSchema: Type): TypeConverter[Option[T]] = { val buffered = t @@ -313,14 +329,17 @@ object ParquetField { pa: ParquetArray ): ParquetField[C[T]] = { new ParquetField[C[T]] { - override val hasAvroArray: Boolean = pa match { + // Legacy compat with Magnolify <= 0.7; future versions will remove AvroCompat in favor of + // Configuration-based approach + @nowarn("cat=deprecation") + val groupAvroArrays: Boolean = pa match { case ParquetArray.default => false case ParquetArray.AvroCompat.avroCompat => true } - override def buildSchema(cm: CaseMapper): Type = { - val repeatedSchema = Schema.setRepetition(t.schema(cm), Repetition.REPEATED) - if (hasAvroArray) { + override def buildSchema(cm: CaseMapper, conf: Configuration): Type = { + val repeatedSchema = Schema.setRepetition(t.schema(cm, conf), Repetition.REPEATED) + if (isGroup(conf)) { Types .requiredGroup() .addField(Schema.rename(repeatedSchema, AvroArrayField)) @@ -331,16 +350,21 @@ object ParquetField { } } - override protected val isGroup: Boolean = hasAvroArray + override protected def isGroup(conf: Configuration): Boolean = + groupAvroArrays || conf.getBoolean( + MagnolifyParquetProperties.WriteGroupedArrays, + MagnolifyParquetProperties.WriteGroupedArraysDefault + ) + override protected def isEmpty(v: C[T]): Boolean = v.forall(t.isEmpty) - override def write(c: RecordConsumer, v: C[T])(cm: CaseMapper): Unit = - if (hasAvroArray) { + override def write(c: RecordConsumer, v: C[T], conf: Configuration)(cm: CaseMapper): Unit = + if (isGroup(conf)) { c.startField(AvroArrayField, 0) - v.foreach(t.writeGroup(c, _)(cm)) + v.foreach(t.writeGroup(c, _, conf)(cm)) c.endField(AvroArrayField, 0) } else { - v.foreach(t.writeGroup(c, _)(cm)) + v.foreach(t.writeGroup(c, _, conf)(cm)) } override def newConverter(writerSchema: Type): TypeConverter[C[T]] = { @@ -381,10 +405,10 @@ object ParquetField { pfValue: ParquetField[V] ): ParquetField[Map[K, V]] = { new ParquetField[Map[K, V]] { - override def buildSchema(cm: CaseMapper): Type = { - val keySchema = Schema.rename(pfKey.schema(cm), KeyField) + override def buildSchema(cm: CaseMapper, conf: Configuration): Type = { + val keySchema = Schema.rename(pfKey.schema(cm, conf), KeyField) require(keySchema.isRepetition(Repetition.REQUIRED), "Map key must be required") - val valueSchema = Schema.rename(pfValue.schema(cm), ValueField) + val valueSchema = Schema.rename(pfValue.schema(cm, conf), ValueField) val keyValue = Types .repeatedGroup() .addField(keySchema) @@ -397,26 +421,26 @@ object ParquetField { .named("map") } - override val hasAvroArray: Boolean = pfKey.hasAvroArray || pfValue.hasAvroArray - override protected def isEmpty(v: Map[K, V]): Boolean = v.isEmpty override def fieldDocs(cm: CaseMapper): Map[String, String] = Map.empty override val typeDoc: Option[String] = None - override def write(c: RecordConsumer, v: Map[K, V])(cm: CaseMapper): Unit = { + override def write(c: RecordConsumer, v: Map[K, V], conf: Configuration)( + cm: CaseMapper + ): Unit = { if (v.nonEmpty) { c.startGroup() c.startField(KeyValueGroup, 0) v.foreach { case (k, v) => c.startGroup() c.startField(KeyField, 0) - pfKey.writeGroup(c, k)(cm) + pfKey.writeGroup(c, k, conf)(cm) c.endField(KeyField, 0) if (pfValue.nonEmpty(v)) { c.startField(ValueField, 1) - pfValue.writeGroup(c, v)(cm) + pfValue.writeGroup(c, v, conf)(cm) c.endField(ValueField, 1) } c.endGroup() @@ -469,8 +493,10 @@ object ParquetField { class LogicalTypeWord[T](lta: => LogicalTypeAnnotation) extends Serializable { def apply[U](f: T => U)(g: U => T)(implicit pf: Primitive[T]): Primitive[U] = new Primitive[U] { - override def buildSchema(cm: CaseMapper): Type = Schema.setLogicalType(pf.schema(cm), lta) - override def write(c: RecordConsumer, v: U)(cm: CaseMapper): Unit = pf.write(c, g(v))(cm) + override def buildSchema(cm: CaseMapper, conf: Configuration): Type = + Schema.setLogicalType(pf.schema(cm, conf), lta) + override def write(c: RecordConsumer, v: U, conf: Configuration)(cm: CaseMapper): Unit = + pf.write(c, g(v), conf)(cm) override def newConverter(writerSchema: Type): TypeConverter[U] = pf.newConverter(writerSchema).asInstanceOf[TypeConverter.Primitive[T]].map(f) @@ -504,14 +530,16 @@ object ParquetField { ) new Primitive[BigDecimal] { - override def buildSchema(cm: CaseMapper): Type = + override def buildSchema(cm: CaseMapper, conf: Configuration): Type = Schema.primitive( PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, LogicalTypeAnnotation.decimalType(scale, precision), length ) - override def write(c: RecordConsumer, v: BigDecimal)(cm: CaseMapper): Unit = + override def write(c: RecordConsumer, v: BigDecimal, conf: Configuration)( + cm: CaseMapper + ): Unit = c.addBinary(Binary.fromConstantByteArray(Decimal.toFixed(v, precision, scale, length))) override def newConverter(writerSchema: Type): TypeConverter[BigDecimal] = @@ -535,10 +563,10 @@ object ParquetField { logicalType[String](LogicalTypeAnnotation.enumType())(et.from)(et.to) implicit val ptUuid: Primitive[UUID] = new Primitive[UUID] { - override def buildSchema(cm: CaseMapper): Type = + override def buildSchema(cm: CaseMapper, conf: Configuration): Type = Schema.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, length = 16) - override def write(c: RecordConsumer, v: UUID)(cm: CaseMapper): Unit = + override def write(c: RecordConsumer, v: UUID, conf: Configuration)(cm: CaseMapper): Unit = c.addBinary( Binary.fromConstantByteArray( ByteBuffer diff --git a/parquet/src/main/scala/magnolify/parquet/ParquetType.scala b/parquet/src/main/scala/magnolify/parquet/ParquetType.scala index 829bfcd58..8f9760f0a 100644 --- a/parquet/src/main/scala/magnolify/parquet/ParquetType.scala +++ b/parquet/src/main/scala/magnolify/parquet/ParquetType.scala @@ -17,6 +17,7 @@ package magnolify.parquet import magnolify.shared.{Converter => _, _} +import magnolify.parquet.MagnolifyParquetProperties._ import org.apache.avro.{Schema => AvroSchema} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job @@ -37,11 +38,20 @@ import org.typelevel.scalaccompat.annotation.nowarn sealed trait ParquetArray /** - * Add `import magnolify.parquet.ParquetArray.AvroCompat._` to generate AVRO schema on write + * Add `import magnolify.parquet.ParquetArray.AvroCompat._` to generate generate Avro-compatible + * array schemas. This import is DEPRECATED. Instead, pass the following option to your Parquet + * Configuration: + * + * magnolify.parquet.write-grouped-arrays: true */ object ParquetArray { implicit case object default extends ParquetArray + @deprecated( + message = + "AvroCompat import is deprecated; set Parquet Configuration option `magnolify.parquet.write-grouped-arrays: true` instead", + since = "0.8.0" + ) object AvroCompat { implicit case object avroCompat extends ParquetArray } @@ -50,9 +60,10 @@ object ParquetArray { sealed trait ParquetType[T] extends Serializable { import ParquetType._ - def schema: MessageType - def avroSchema: AvroSchema - val avroCompat: Boolean + def schema: MessageType = schema(None) + def schema(conf: Option[Configuration]): MessageType + def avroSchema: AvroSchema = avroSchema(None) + def avroSchema(conf: Option[Configuration]): AvroSchema def setupInput(job: Job): Unit = { job.setInputFormatClass(classOf[ParquetInputFormat[T]]) @@ -72,8 +83,8 @@ sealed trait ParquetType[T] extends Serializable { def readBuilder(file: InputFile): ReadBuilder[T] = new ReadBuilder(file, readSupport) def writeBuilder(file: OutputFile): WriteBuilder[T] = new WriteBuilder(file, writeSupport) - def write(c: RecordConsumer, v: T): Unit = () - def newConverter(writerSchema: Type): TypeConverter[T] = null + private[parquet] def write(c: RecordConsumer, v: T, conf: Configuration): Unit = () + private[parquet] def newConverter(writerSchema: Type): TypeConverter[T] = null } object ParquetType { @@ -87,28 +98,24 @@ object ParquetType { )(implicit f: ParquetField[T], pa: ParquetArray): ParquetType[T] = f match { case r: ParquetField.Record[_] => new ParquetType[T] { - @transient override lazy val schema: MessageType = Schema.message(r.schema(cm)) - @transient override lazy val avroSchema: AvroSchema = { - val s = new AvroSchemaConverter().convert(schema) + @transient override def schema(conf: Option[Configuration]): MessageType = + Schema.message(r.schema(cm, conf.getOrElse(new Configuration()))) + @transient override def avroSchema(conf: Option[Configuration]): AvroSchema = { + val s = new AvroSchemaConverter().convert(schema(conf)) // add doc to avro schema val fieldDocs = f.fieldDocs(cm) SchemaUtil.deepCopy(s, f.typeDoc, fieldDocs.get) } - override val avroCompat: Boolean = - pa == ParquetArray.AvroCompat.avroCompat || f.hasAvroArray - - override def write(c: RecordConsumer, v: T): Unit = r.write(c, v)(cm) - override def newConverter(writerSchema: Type): TypeConverter[T] = + override def write(c: RecordConsumer, v: T, conf: Configuration): Unit = + r.write(c, v, conf)(cm) + override private[parquet] def newConverter(writerSchema: Type): TypeConverter[T] = r.newConverter(writerSchema) } case _ => throw new IllegalArgumentException(s"ParquetType can only be created from Record. Got $f") } - val ReadTypeKey = "parquet.type.read.type" - val WriteTypeKey = "parquet.type.write.type" - class ReadBuilder[T](file: InputFile, val readSupport: ReadSupport[T]) extends ParquetReader.Builder[T](file) { override def getReadSupport: ReadSupport[T] = readSupport @@ -122,7 +129,6 @@ object ParquetType { // From AvroReadSupport private val AVRO_SCHEMA_METADATA_KEY = "parquet.avro.schema" - private val OLD_AVRO_SCHEMA_METADATA_KEY = "avro.schema" class ReadSupport[T](private var parquetType: ParquetType[T]) extends hadoop.ReadSupport[T] { def this() = this(null) @@ -135,29 +141,13 @@ object ParquetType { parquetType = SerializationUtils.fromBase64[ParquetType[T]](readKeyType) } - val metadata = context.getKeyValueMetadata - val model = metadata.get(ParquetWriter.OBJECT_MODEL_NAME_PROP) - val isAvroFile = (model != null && model.contains("avro")) || - metadata.containsKey(AVRO_SCHEMA_METADATA_KEY) || - metadata.containsKey(OLD_AVRO_SCHEMA_METADATA_KEY) - if (isAvroFile && !parquetType.avroCompat) { - logger.warn( - "Parquet file was written from Avro records, " + - "`import magnolify.parquet.ParquetArray.AvroCompat._` to read correctly" - ) - } - if (!isAvroFile && parquetType.avroCompat) { - logger.warn( - "Parquet file was not written from Avro records, " + - "remove `import magnolify.parquet.ParquetArray.AvroCompat._` to read correctly" - ) - } - val requestedSchema = { - val s = Schema.message(parquetType.schema) + val s = Schema.message(parquetType.schema(Some(context.getConfiguration))): @nowarn( + "cat=deprecation" + ) // If reading Avro, roundtrip schema using parquet-avro converter to ensure array compatibility; // magnolify-parquet does not automatically wrap repeated fields into a group like parquet-avro does - if (isAvroFile) { + if (Schema.hasGroupedArray(context.getFileSchema)) { val converter = new AvroSchemaConverter() converter.convert(converter.convert(s)) } else { @@ -165,10 +155,7 @@ object ParquetType { } } Schema.checkCompatibility(context.getFileSchema, requestedSchema) - new hadoop.ReadSupport.ReadContext( - requestedSchema, - java.util.Collections.emptyMap() - ) + new hadoop.ReadSupport.ReadContext(requestedSchema, java.util.Collections.emptyMap()) } override def prepareForRead( @@ -190,26 +177,40 @@ object ParquetType { override def getName: String = "magnolify" private var recordConsumer: RecordConsumer = null + private var conf: Configuration = null override def init(configuration: Configuration): hadoop.WriteSupport.WriteContext = { if (parquetType == null) { parquetType = SerializationUtils.fromBase64[ParquetType[T]](configuration.get(WriteTypeKey)) } - val schema = Schema.message(parquetType.schema) + val schema = Schema.message(parquetType.schema(Some(configuration))) val metadata = new java.util.HashMap[String, String]() - if (parquetType.avroCompat) { - // This overrides `WriteSupport#getName` - metadata.put(ParquetWriter.OBJECT_MODEL_NAME_PROP, "avro") - metadata.put(AVRO_SCHEMA_METADATA_KEY, parquetType.avroSchema.toString()) - } else { - logger.warn( - "Parquet file is being written with no avro compatibility, this mode is not " + - "producing schema. Add `import magnolify.parquet.ParquetArray.AvroCompat._` to " + - "generate schema" + + if ( + configuration.getBoolean( + MagnolifyParquetProperties.WriteAvroSchemaToMetadata, + MagnolifyParquetProperties.WriteAvroSchemaToMetadataDefault ) + ) { + try { + metadata.put( + AVRO_SCHEMA_METADATA_KEY, + parquetType.avroSchema(Some(configuration)).toString() + ) + } catch { + // parquet-avro has greater schema restrictions than magnolify-parquet, e.g., parquet-avro does not + // support Maps with non-Binary key types + case e: IllegalArgumentException => + logger.warn( + s"Writer schema `$schema` contains a type not supported by Avro schemas; will not write " + + s"key $AVRO_SCHEMA_METADATA_KEY to file metadata", + e + ) + } } + this.conf = configuration new hadoop.WriteSupport.WriteContext(schema, metadata) } @@ -218,7 +219,7 @@ object ParquetType { override def write(record: T): Unit = { recordConsumer.startMessage() - parquetType.write(recordConsumer, record) + parquetType.write(recordConsumer, record, conf) recordConsumer.endMessage() } } diff --git a/parquet/src/main/scala/magnolify/parquet/Predicate.scala b/parquet/src/main/scala/magnolify/parquet/Predicate.scala index a1e0ce927..e902560d4 100644 --- a/parquet/src/main/scala/magnolify/parquet/Predicate.scala +++ b/parquet/src/main/scala/magnolify/parquet/Predicate.scala @@ -17,6 +17,7 @@ package magnolify.parquet import magnolify.shared.CaseMapper +import org.apache.hadoop.conf.Configuration import org.apache.parquet.filter2.predicate.Operators.Column import org.apache.parquet.filter2.predicate.{ FilterApi, @@ -52,7 +53,8 @@ object Predicate { )(filterFn: ScalaFieldT => Boolean)(implicit pf: ParquetField.Primitive[ScalaFieldT] ): FilterPredicate = { - val fieldType = pf.schema(CaseMapper.identity).asPrimitiveType().getPrimitiveTypeName + val fieldType = + pf.schema(CaseMapper.identity, new Configuration()).asPrimitiveType().getPrimitiveTypeName val column = fieldType match { case PrimitiveTypeName.INT32 => FilterApi.intColumn(fieldName) @@ -65,7 +67,7 @@ object Predicate { } def wrap[T](addFn: (PrimitiveConverter, T) => Unit): T => ScalaFieldT = { - lazy val converter = pf.newConverter(pf.schema(CaseMapper.identity)) + lazy val converter = pf.newConverter(pf.schema(CaseMapper.identity, new Configuration())) value => { addFn(converter.asPrimitiveConverter(), value) converter.get diff --git a/parquet/src/main/scala/magnolify/parquet/Schema.scala b/parquet/src/main/scala/magnolify/parquet/Schema.scala index e58fae85b..b84dcf69e 100644 --- a/parquet/src/main/scala/magnolify/parquet/Schema.scala +++ b/parquet/src/main/scala/magnolify/parquet/Schema.scala @@ -124,7 +124,9 @@ private object Schema { !isRepetitionBackwardCompatible(writer, reader) || writer.isPrimitive != reader.isPrimitive ) { - throw new InvalidRecordException(s"$writer found: expected $reader") + throw new InvalidRecordException( + s"Writer schema `$writer` incompatible with reader schema `$reader``" + ) } writer match { diff --git a/parquet/src/test/scala/magnolify/parquet/AvroParquetSuite.scala b/parquet/src/test/scala/magnolify/parquet/AvroParquetSuite.scala index 440bb96d9..bf11f1845 100644 --- a/parquet/src/test/scala/magnolify/parquet/AvroParquetSuite.scala +++ b/parquet/src/test/scala/magnolify/parquet/AvroParquetSuite.scala @@ -24,7 +24,6 @@ import magnolify.avro.AvroType import magnolify.shared.{doc, CaseMapper} import magnolify.avro.unsafe._ import magnolify.parquet.unsafe._ -import magnolify.parquet.ParquetArray.AvroCompat._ import magnolify.parquet.util.AvroSchemaComparer import magnolify.scalacheck.auto._ import magnolify.scalacheck.TestArbitrary._ @@ -41,10 +40,14 @@ import org.apache.parquet.avro.{ } import org.scalacheck._ +import scala.annotation.nowarn import scala.reflect.ClassTag +@nowarn("cat=deprecation") // Suppress warnings from importing AvroCompat class AvroParquetSuite extends MagnolifySuite { + import magnolify.parquet.ParquetArray.AvroCompat._ + private def test[T: Arbitrary: ClassTag]()(implicit at: AvroType[T], tpe: ParquetType[T], @@ -77,7 +80,8 @@ class AvroParquetSuite extends MagnolifySuite { val r = at(t) val out = new TestOutputFile - val writer = AvroParquetWriter.builder[GenericRecord](out).withSchema(at.schema).build() + val writer = + AvroParquetWriter.builder[GenericRecord](out).withSchema(at.schema).build() writer.write(r) writer.close() diff --git a/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala b/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala index 275e406f3..56d4fdba9 100644 --- a/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala +++ b/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala @@ -28,12 +28,11 @@ import magnolify.shared.TestEnumType._ import magnolify.test.Simple._ import magnolify.test._ import org.apache.hadoop.conf.Configuration +import org.apache.parquet.avro.AvroSchemaConverter import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.{api => hadoop} import org.apache.parquet.io._ -import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName -import org.apache.parquet.schema.Type.Repetition -import org.apache.parquet.schema.Types import org.scalacheck._ import java.io.ByteArrayInputStream @@ -201,102 +200,55 @@ class ParquetTypeSuite extends MagnolifySuite { test(s"AvroCompat") { val pt = ParquetType[WithList] // Assert that by default, Magnolify doesn't wrap repeated fields in group types - assert(!Schema.hasGroupedArray(pt.schema)) - assertEquals( - """|message magnolify.parquet.WithList { - | required binary s (STRING); - | repeated binary l (STRING); - |} - |""".stripMargin, - pt.schema.toString - ) - - // Construct Magnolify read support for a Parquet file written using parquet-avro - val readSupport = pt.readSupport.init( - new InitContext( - new Configuration(), - Map(ParquetWriter.OBJECT_MODEL_NAME_PROP -> Set("avro").asJava).asJava, - // Build expected writer schema, which uses Avro array grouping - Types - .buildMessage() - .addField(Types.primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED).named("s")) - .addField( - Types - .buildGroup(Repetition.REQUIRED) - .addField( - Types - .primitive(PrimitiveTypeName.BINARY, Repetition.REPEATED) - .named("array") - ) - .named("list") - ) - .named("WithList") - ) - ) - - // Assert that Magnolify is using a compatible array schema to read - assert(Schema.hasGroupedArray(readSupport.getRequestedSchema)) - assertEquals( - """|message magnolify.parquet.WithList { - | required binary s (STRING); - | required group l (LIST) { - | repeated binary array (STRING); - | } - |} - |""".stripMargin, - readSupport.getRequestedSchema.toString - ) - } - - test(s"AvroCompat") { - val pt = ParquetType[WithList] + val nonAvroCompliantSchema = """|message magnolify.parquet.WithList { + | required binary s (STRING); + | repeated binary l (STRING); + |} + |""".stripMargin - // Assert that by default, Magnolify doesn't wrap repeated fields in group types assert(!Schema.hasGroupedArray(pt.schema)) - assertEquals( - """|message magnolify.parquet.WithList { - | required binary s (STRING); - | repeated binary l (STRING); - |} - |""".stripMargin, - pt.schema.toString - ) + assertEquals(nonAvroCompliantSchema, pt.schema.toString) - // Construct Magnolify read support for a Parquet file written using parquet-avro + // Assert that ReadSupport converts non-grouped arrays to grouped arrays depending on writer schema + val asc = new AvroSchemaConverter() val readSupport = pt.readSupport.init( - new InitContext( + new hadoop.InitContext( new Configuration(), Map(ParquetWriter.OBJECT_MODEL_NAME_PROP -> Set("avro").asJava).asJava, - // Build expected writer schema, which uses parquet-avro-style array grouping - Types - .buildMessage() - .addField(Types.primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED).named("s")) - .addField( - Types - .buildGroup(Repetition.REQUIRED) - .addField( - Types - .primitive(PrimitiveTypeName.BINARY, Repetition.REPEATED) - .named("array") - ) - .named("list") - ) - .named("WithList") + asc.convert( + asc.convert(pt.schema) + ) // Use converted Avro-compliant schema, which groups lists ) ) - // Assert that Magnolify is using a compatible array schema to read + val avroCompliantSchema = """|message magnolify.parquet.WithList { + | required binary s (STRING); + | required group l (LIST) { + | repeated binary array (STRING); + | } + |} + |""".stripMargin + assert(Schema.hasGroupedArray(readSupport.getRequestedSchema)) - assertEquals( - """|message magnolify.parquet.WithList { - | required binary s (STRING); - | required group l (LIST) { - | repeated binary array (STRING); - | } - |} - |""".stripMargin, - readSupport.getRequestedSchema.toString - ) + assertEquals(avroCompliantSchema, readSupport.getRequestedSchema.toString) + + // Assert that WriteSupport uses grouped schema when explicitly configured + { + val conf = new Configuration() + conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, true) + val writeContext = pt.writeSupport.init(conf) + assertEquals(avroCompliantSchema, writeContext.getSchema.toString) + assertEquals(pt.schema(Some(conf)), writeContext.getSchema) + } + + // Assert that WriteSupport uses non-grouped schema otherwise + { + val conf = new Configuration() + conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, false) + val writeContext = pt.writeSupport.init(conf) + assertEquals(nonAvroCompliantSchema, writeContext.getSchema.toString) + assertEquals(pt.schema(Some(conf)), writeContext.getSchema) + } } } diff --git a/parquet/src/test/scala/magnolify/parquet/SchemaEvolutionSuite.scala b/parquet/src/test/scala/magnolify/parquet/SchemaEvolutionSuite.scala index 9d279d8e9..7d5aab8e5 100644 --- a/parquet/src/test/scala/magnolify/parquet/SchemaEvolutionSuite.scala +++ b/parquet/src/test/scala/magnolify/parquet/SchemaEvolutionSuite.scala @@ -294,6 +294,7 @@ object SchemaEvolutionSuite { } @nowarn("msg=Unused import") +@nowarn("cat=deprecation") // Suppress warnings from importing AvroCompat class SchemaEvolutionSuite extends MagnolifySuite { import SchemaEvolutionSuite._ From a73071890bed364bcec97d355692af02a63f64ac Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 12 Sep 2024 13:55:28 -0400 Subject: [PATCH 04/12] fix for scala 2.12 --- .../magnolify/parquet/MagnolifyParquetProperties.scala | 10 +++++----- .../main/scala/magnolify/parquet/ParquetField.scala | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala b/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala index e3c1824e2..7f5768166 100644 --- a/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala +++ b/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala @@ -18,8 +18,10 @@ package magnolify.parquet import org.apache.hadoop.conf.Configuration -import java.util.Objects - +/** + * Properties for reading and writing Magnolify ParquetType classes, + * configurable via a Hadoop [[Configuration]] instance. + */ object MagnolifyParquetProperties { val WriteGroupedArrays: String = "magnolify.parquet.write-grouped-arrays" val WriteGroupedArraysDefault: Boolean = false @@ -32,8 +34,6 @@ object MagnolifyParquetProperties { // Hash any Configuration values that might affect schema creation to use as part of Schema cache key private[parquet] def hashValues(conf: Configuration): Int = { - Objects.hash( - Option(conf.get(WriteGroupedArrays)).map(_.toBoolean).getOrElse(WriteGroupedArraysDefault) - ) + Option(conf.get(WriteGroupedArrays)).map(_.toBoolean).getOrElse(WriteGroupedArraysDefault).hashCode() } } diff --git a/parquet/src/main/scala/magnolify/parquet/ParquetField.scala b/parquet/src/main/scala/magnolify/parquet/ParquetField.scala index e9585bd8e..670b21ac8 100644 --- a/parquet/src/main/scala/magnolify/parquet/ParquetField.scala +++ b/parquet/src/main/scala/magnolify/parquet/ParquetField.scala @@ -29,9 +29,9 @@ import org.apache.parquet.io.api._ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema.{LogicalTypeAnnotation, Type, Types} -import org.typelevel.scalaccompat.annotation.nowarn -import scala.annotation.implicitNotFound +import scala.annotation.{implicitNotFound, nowarn} +import scala.collection.compat._ import scala.collection.concurrent sealed trait ParquetField[T] extends Serializable { From 5e622ab5cc7943d31f48f51ed2c351f337612cdb Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 12 Sep 2024 14:09:02 -0400 Subject: [PATCH 05/12] scalafmt --- .../parquet/MagnolifyParquetProperties.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala b/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala index 7f5768166..34a51a91d 100644 --- a/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala +++ b/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala @@ -19,8 +19,8 @@ package magnolify.parquet import org.apache.hadoop.conf.Configuration /** - * Properties for reading and writing Magnolify ParquetType classes, - * configurable via a Hadoop [[Configuration]] instance. + * Properties for reading and writing Magnolify ParquetType classes, configurable via a Hadoop + * [[Configuration]] instance. */ object MagnolifyParquetProperties { val WriteGroupedArrays: String = "magnolify.parquet.write-grouped-arrays" @@ -33,7 +33,9 @@ object MagnolifyParquetProperties { val WriteTypeKey = "parquet.type.write.type" // Hash any Configuration values that might affect schema creation to use as part of Schema cache key - private[parquet] def hashValues(conf: Configuration): Int = { - Option(conf.get(WriteGroupedArrays)).map(_.toBoolean).getOrElse(WriteGroupedArraysDefault).hashCode() - } + private[parquet] def hashValues(conf: Configuration): Int = + Option(conf.get(WriteGroupedArrays)) + .map(_.toBoolean) + .getOrElse(WriteGroupedArraysDefault) + .hashCode() } From db5b1b0020936e91833e8a017449b6d696178777 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 12 Sep 2024 16:04:42 -0400 Subject: [PATCH 06/12] fix tools test --- build.sbt | 1 + tools/src/test/scala/magnolify/tools/ParquetParserSuite.scala | 2 ++ 2 files changed, 3 insertions(+) diff --git a/build.sbt b/build.sbt index 068b2c94f..f4ed68c96 100644 --- a/build.sbt +++ b/build.sbt @@ -693,6 +693,7 @@ lazy val tools = project "com.google.apis" % "google-api-services-bigquery" % bigqueryVersion, "org.apache.avro" % "avro" % avroVersion % Provided, "org.apache.parquet" % "parquet-hadoop" % parquetVersion, + "org.apache.hadoop" % "hadoop-common" % hadoopVersion, "org.typelevel" %% "paiges-core" % paigesVersion ) ) diff --git a/tools/src/test/scala/magnolify/tools/ParquetParserSuite.scala b/tools/src/test/scala/magnolify/tools/ParquetParserSuite.scala index e4505ad8f..676ba33bd 100644 --- a/tools/src/test/scala/magnolify/tools/ParquetParserSuite.scala +++ b/tools/src/test/scala/magnolify/tools/ParquetParserSuite.scala @@ -20,8 +20,10 @@ import magnolify.parquet._ import magnolify.test._ import java.time.{Instant, LocalDate, LocalDateTime, LocalTime, OffsetTime} +import scala.annotation.nowarn import scala.reflect.ClassTag +@nowarn("cat=deprecation") // Suppress warnings from importing AvroCompat class ParquetParserSuite extends MagnolifySuite { import ParquetParserSuite._ From be75cc34a28fc0e3a3bf5eca247159f0b760637a Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 12 Sep 2024 16:50:26 -0400 Subject: [PATCH 07/12] drop Optional --- .../scala/magnolify/parquet/ParquetType.scala | 21 ++++++++++--------- .../magnolify/parquet/ParquetTypeSuite.scala | 4 ++-- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/parquet/src/main/scala/magnolify/parquet/ParquetType.scala b/parquet/src/main/scala/magnolify/parquet/ParquetType.scala index 8f9760f0a..b2b179dfb 100644 --- a/parquet/src/main/scala/magnolify/parquet/ParquetType.scala +++ b/parquet/src/main/scala/magnolify/parquet/ParquetType.scala @@ -60,10 +60,11 @@ object ParquetArray { sealed trait ParquetType[T] extends Serializable { import ParquetType._ - def schema: MessageType = schema(None) - def schema(conf: Option[Configuration]): MessageType - def avroSchema: AvroSchema = avroSchema(None) - def avroSchema(conf: Option[Configuration]): AvroSchema + def schema: MessageType = schema(new Configuration()) + def schema(conf: Configuration): MessageType + + def avroSchema: AvroSchema = avroSchema(new Configuration()) + def avroSchema(conf: Configuration): AvroSchema def setupInput(job: Job): Unit = { job.setInputFormatClass(classOf[ParquetInputFormat[T]]) @@ -98,9 +99,9 @@ object ParquetType { )(implicit f: ParquetField[T], pa: ParquetArray): ParquetType[T] = f match { case r: ParquetField.Record[_] => new ParquetType[T] { - @transient override def schema(conf: Option[Configuration]): MessageType = - Schema.message(r.schema(cm, conf.getOrElse(new Configuration()))) - @transient override def avroSchema(conf: Option[Configuration]): AvroSchema = { + @transient override def schema(conf: Configuration): MessageType = + Schema.message(r.schema(cm, conf)) + @transient override def avroSchema(conf: Configuration): AvroSchema = { val s = new AvroSchemaConverter().convert(schema(conf)) // add doc to avro schema val fieldDocs = f.fieldDocs(cm) @@ -142,7 +143,7 @@ object ParquetType { } val requestedSchema = { - val s = Schema.message(parquetType.schema(Some(context.getConfiguration))): @nowarn( + val s = Schema.message(parquetType.schema(context.getConfiguration)): @nowarn( "cat=deprecation" ) // If reading Avro, roundtrip schema using parquet-avro converter to ensure array compatibility; @@ -184,7 +185,7 @@ object ParquetType { parquetType = SerializationUtils.fromBase64[ParquetType[T]](configuration.get(WriteTypeKey)) } - val schema = Schema.message(parquetType.schema(Some(configuration))) + val schema = Schema.message(parquetType.schema(configuration)) val metadata = new java.util.HashMap[String, String]() if ( @@ -196,7 +197,7 @@ object ParquetType { try { metadata.put( AVRO_SCHEMA_METADATA_KEY, - parquetType.avroSchema(Some(configuration)).toString() + parquetType.avroSchema(configuration).toString() ) } catch { // parquet-avro has greater schema restrictions than magnolify-parquet, e.g., parquet-avro does not diff --git a/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala b/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala index 56d4fdba9..593155919 100644 --- a/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala +++ b/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala @@ -238,7 +238,7 @@ class ParquetTypeSuite extends MagnolifySuite { conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, true) val writeContext = pt.writeSupport.init(conf) assertEquals(avroCompliantSchema, writeContext.getSchema.toString) - assertEquals(pt.schema(Some(conf)), writeContext.getSchema) + assertEquals(pt.schema(conf), writeContext.getSchema) } // Assert that WriteSupport uses non-grouped schema otherwise @@ -247,7 +247,7 @@ class ParquetTypeSuite extends MagnolifySuite { conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, false) val writeContext = pt.writeSupport.init(conf) assertEquals(nonAvroCompliantSchema, writeContext.getSchema.toString) - assertEquals(pt.schema(Some(conf)), writeContext.getSchema) + assertEquals(pt.schema(conf), writeContext.getSchema) } } } From 56ff232c1b92161d9d1503f8392d74d834f7db7f Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 17 Sep 2024 12:52:43 -0400 Subject: [PATCH 08/12] add JMH benchmark for Parquet --- build.sbt | 8 +- .../scala/magnolify/jmh/MagnolifyBench.scala | 79 +++++++++++++++++-- 2 files changed, 81 insertions(+), 6 deletions(-) diff --git a/build.sbt b/build.sbt index f4ed68c96..3d9ec239c 100644 --- a/build.sbt +++ b/build.sbt @@ -708,6 +708,7 @@ lazy val jmh: Project = project cats % Test, datastore % Test, guava % Test, + parquet % "test->test", protobuf % "test->test", scalacheck % Test, tensorflow % Test, @@ -727,7 +728,12 @@ lazy val jmh: Project = project "com.google.apis" % "google-api-services-bigquery" % bigqueryVersion % Test, "com.google.cloud.datastore" % "datastore-v1-proto-client" % datastoreVersion % Test, "org.apache.avro" % "avro" % avroVersion % Test, - "org.tensorflow" % "tensorflow-core-api" % tensorflowVersion % Test + "org.tensorflow" % "tensorflow-core-api" % tensorflowVersion % Test, + "org.apache.parquet" % "parquet-avro" % parquetVersion % Test, + "org.apache.parquet" % "parquet-column" % parquetVersion % Test, + "org.apache.parquet" % "parquet-hadoop" % parquetVersion % Test, + "org.apache.hadoop" % "hadoop-common" % hadoopVersion % Test, + "org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion % Test ) ) diff --git a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala index ccf745160..05bd61159 100644 --- a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala +++ b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala @@ -16,12 +16,18 @@ package magnolify.jmh +import magnolify.jmh.MagnolifyBench.nested +import magnolify.parquet.{MagnolifyParquetProperties, ParquetType, TestInputFile, TestOutputFile} + import java.util.concurrent.TimeUnit +import magnolify.scalacheck.auto.* +import magnolify.test.Simple.* +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter} +import org.scalacheck.* +import org.openjdk.jmh.annotations.* -import magnolify.scalacheck.auto._ -import magnolify.test.Simple._ -import org.scalacheck._ -import org.openjdk.jmh.annotations._ +import scala.jdk.CollectionConverters.* object MagnolifyBench { val seed: rng.Seed = rng.Seed(0) @@ -87,6 +93,69 @@ class AvroBench { @Benchmark def avroSchema: Schema = AvroType[Nested].schema } +@State(Scope.Benchmark) +class ParquetReadState()(implicit pt: ParquetType[Nested]) { + var out: TestOutputFile = null + var reader: ParquetReader[Nested] = null + + @Setup(Level.Invocation) + def setup(): Unit = { + out = new TestOutputFile + val writer = pt.writeBuilder(out).build() + writer.write(nested) + writer.close() + + val in = new TestInputFile(out.getBytes) + reader = pt.readBuilder(in).build() + } + + @TearDown(Level.Invocation) + def tearDown(): Unit = { + reader.close() + } +} + +@State(Scope.Benchmark) +class ParquetWriteState()(implicit pt: ParquetType[Nested]) { + var writer: ParquetWriter[Nested] = null + + @Setup(Level.Invocation) + def setup(): Unit = { + val out = new TestOutputFile + writer = pt.writeBuilder(out).build() + } + + @TearDown(Level.Invocation) + def tearDown(): Unit = { + writer.close() + } +} + + +@BenchmarkMode(Array(Mode.AverageTime)) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Thread) +class ParquetBench { + import MagnolifyBench._ + implicit val pt: ParquetType[Nested] = ParquetType[Nested] + + @Benchmark def parquetWrite(state: ParquetWriteState): Unit = state.writer.write(nested) + @Benchmark def parquetRead(state: ParquetReadState): Nested = state.reader.read() +} + +@BenchmarkMode(Array(Mode.AverageTime)) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Thread) +class ParquetAvroCompatBench { + import MagnolifyBench._ + val conf = new Configuration() + conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, MagnolifyParquetProperties.WriteGroupedArraysDefault) + implicit val pt: ParquetType[Nested] = ParquetType[Nested](conf) + + @Benchmark def parquetWrite(state: ParquetWriteState): Unit = state.writer.write(nested) + @Benchmark def parquetRead(state: ParquetReadState): Nested = state.reader.read() +} + @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.NANOSECONDS) @State(Scope.Thread) @@ -157,7 +226,7 @@ class ExampleBench { private val exampleNested = implicitly[Arbitrary[ExampleNested]].arbitrary(prms, seed).get private val example = exampleType.to(exampleNested).build() @Benchmark def exampleTo: Example.Builder = exampleType.to(exampleNested) - @Benchmark def exampleFrom: ExampleNested = exampleType.from(example) + @Benchmark def exampleFrom: ExampleNested = exampleType.from(example.getFeatures.getFeatureMap.asScala.toMap) } // Collections are not supported From ca535fc6838facf925e3b02fcaf06f32eb0c8b36 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 17 Sep 2024 16:12:49 -0400 Subject: [PATCH 09/12] [wip] Refactor ParquetType to pass Configuration at instantiation time --- .../scala/magnolify/jmh/MagnolifyBench.scala | 38 +++-- .../magnolify/parquet/ParquetField.scala | 146 ++++++++++-------- .../scala/magnolify/parquet/ParquetType.scala | 59 ++++--- .../scala/magnolify/parquet/Predicate.scala | 9 +- .../magnolify/parquet/ParquetTypeSuite.scala | 42 +++-- 5 files changed, 170 insertions(+), 124 deletions(-) diff --git a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala index 05bd61159..89d0a20d7 100644 --- a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala +++ b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala @@ -20,14 +20,14 @@ import magnolify.jmh.MagnolifyBench.nested import magnolify.parquet.{MagnolifyParquetProperties, ParquetType, TestInputFile, TestOutputFile} import java.util.concurrent.TimeUnit -import magnolify.scalacheck.auto.* -import magnolify.test.Simple.* +import magnolify.scalacheck.auto._ +import magnolify.test.Simple._ import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter} -import org.scalacheck.* -import org.openjdk.jmh.annotations.* +import org.scalacheck._ +import org.openjdk.jmh.annotations._ -import scala.jdk.CollectionConverters.* +import scala.jdk.CollectionConverters._ object MagnolifyBench { val seed: rng.Seed = rng.Seed(0) @@ -94,7 +94,7 @@ class AvroBench { } @State(Scope.Benchmark) -class ParquetReadState()(implicit pt: ParquetType[Nested]) { +class ParquetReadState(pt: ParquetType[Nested]) { var out: TestOutputFile = null var reader: ParquetReader[Nested] = null @@ -116,7 +116,7 @@ class ParquetReadState()(implicit pt: ParquetType[Nested]) { } @State(Scope.Benchmark) -class ParquetWriteState()(implicit pt: ParquetType[Nested]) { +class ParquetWriteState(pt: ParquetType[Nested]) { var writer: ParquetWriter[Nested] = null @Setup(Level.Invocation) @@ -131,16 +131,27 @@ class ParquetWriteState()(implicit pt: ParquetType[Nested]) { } } +object ParquetStates { + def confWithGroupedArraysProp(propValue: Boolean): Configuration = { + val conf = new Configuration() + conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, propValue) + conf + } + class DefaultParquetReadState extends ParquetReadState(ParquetType[Nested](confWithGroupedArraysProp(false))) + class DefaultParquetWriteState extends ParquetWriteState(ParquetType[Nested](confWithGroupedArraysProp(false))) + + class ParquetAvroCompatReadState extends ParquetReadState(ParquetType[Nested](confWithGroupedArraysProp(true))) + class ParquetAvroCompatWriteState extends ParquetWriteState(ParquetType[Nested](confWithGroupedArraysProp(true))) +} @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.NANOSECONDS) @State(Scope.Thread) class ParquetBench { import MagnolifyBench._ - implicit val pt: ParquetType[Nested] = ParquetType[Nested] - @Benchmark def parquetWrite(state: ParquetWriteState): Unit = state.writer.write(nested) - @Benchmark def parquetRead(state: ParquetReadState): Nested = state.reader.read() + @Benchmark def parquetWrite(state: ParquetStates.DefaultParquetWriteState): Unit = state.writer.write(nested) + @Benchmark def parquetRead(state: ParquetStates.DefaultParquetReadState): Nested = state.reader.read() } @BenchmarkMode(Array(Mode.AverageTime)) @@ -148,12 +159,9 @@ class ParquetBench { @State(Scope.Thread) class ParquetAvroCompatBench { import MagnolifyBench._ - val conf = new Configuration() - conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, MagnolifyParquetProperties.WriteGroupedArraysDefault) - implicit val pt: ParquetType[Nested] = ParquetType[Nested](conf) - @Benchmark def parquetWrite(state: ParquetWriteState): Unit = state.writer.write(nested) - @Benchmark def parquetRead(state: ParquetReadState): Nested = state.reader.read() + @Benchmark def parquetWrite(state: ParquetStates.ParquetAvroCompatWriteState): Unit = state.writer.write(nested) + @Benchmark def parquetRead(state: ParquetStates.ParquetAvroCompatReadState): Nested = state.reader.read() } @BenchmarkMode(Array(Mode.AverageTime)) diff --git a/parquet/src/main/scala/magnolify/parquet/ParquetField.scala b/parquet/src/main/scala/magnolify/parquet/ParquetField.scala index 670b21ac8..060461e0e 100644 --- a/parquet/src/main/scala/magnolify/parquet/ParquetField.scala +++ b/parquet/src/main/scala/magnolify/parquet/ParquetField.scala @@ -23,7 +23,6 @@ import magnolify.shims.FactoryCompat import java.nio.{ByteBuffer, ByteOrder} import java.time.LocalDate import java.util.UUID -import org.apache.hadoop.conf.Configuration import org.apache.parquet.io.ParquetDecodingException import org.apache.parquet.io.api._ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -36,40 +35,44 @@ import scala.collection.concurrent sealed trait ParquetField[T] extends Serializable { - @transient private lazy val schemaCache: concurrent.Map[Int, concurrent.Map[UUID, Type]] = + @transient private lazy val schemaCache: concurrent.Map[Boolean, concurrent.Map[UUID, Type]] = concurrent.TrieMap.empty - protected def buildSchema(cm: CaseMapper, conf: Configuration): Type + protected def buildSchema(cm: CaseMapper, groupArrayFields: Boolean): Type - def schema(cm: CaseMapper, conf: Configuration): Type = { - val confHash = MagnolifyParquetProperties.hashValues(conf) - - if (!schemaCache.contains(confHash)) { - schemaCache.put(confHash, concurrent.TrieMap.empty) + def schema( + cm: CaseMapper, + groupArrayFields: Boolean + ): Type = { + if (!schemaCache.contains(groupArrayFields)) { + schemaCache.put(groupArrayFields, concurrent.TrieMap.empty) } - schemaCache(confHash).getOrElseUpdate( + schemaCache(groupArrayFields).getOrElseUpdate( cm.uuid, - buildSchema(cm, Option(conf).getOrElse(new Configuration())) + buildSchema(cm, groupArrayFields) ) } def fieldDocs(cm: CaseMapper): Map[String, String] def typeDoc: Option[String] - protected def isGroup(conf: Configuration): Boolean = false + protected def isGroup(groupArrayFields: Boolean): Boolean = false protected def isEmpty(v: T): Boolean protected final def nonEmpty(v: T): Boolean = !isEmpty(v) - def write(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit + def write(c: RecordConsumer, v: T)(cm: CaseMapper, groupArrayFields: Boolean): Unit def newConverter(writerSchema: Type): TypeConverter[T] - protected def writeGroup(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit = { - val wrapGroup = isGroup(conf) + protected def writeGroup( + c: RecordConsumer, + v: T + )(cm: CaseMapper, groupArrayFields: Boolean): Unit = { + val wrapGroup = isGroup(groupArrayFields) if (wrapGroup) { c.startGroup() } - write(c, v, conf)(cm) + write(c, v)(cm, groupArrayFields) if (wrapGroup) { c.endGroup() } @@ -78,7 +81,7 @@ sealed trait ParquetField[T] extends Serializable { object ParquetField { sealed trait Record[T] extends ParquetField[T] { - override protected def isGroup(conf: Configuration): Boolean = true + override protected def isGroup(groupArrayFields: Boolean): Boolean = true override protected def isEmpty(v: T): Boolean = false } @@ -91,11 +94,14 @@ object ParquetField { val p = caseClass.parameters.head val tc = p.typeclass new ParquetField[T] { - override protected def buildSchema(cm: CaseMapper, conf: Configuration): Type = - tc.buildSchema(cm, conf) + override protected def buildSchema(cm: CaseMapper, groupArrayFields: Boolean): Type = + tc.buildSchema(cm, groupArrayFields) override protected def isEmpty(v: T): Boolean = tc.isEmpty(p.dereference(v)) - override def write(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit = - tc.writeGroup(c, p.dereference(v), conf)(cm) + override def write(c: RecordConsumer, v: T)( + cm: CaseMapper, + groupArrayFields: Boolean + ): Unit = + tc.writeGroup(c, p.dereference(v))(cm, groupArrayFields) override def newConverter(writerSchema: Type): TypeConverter[T] = { val buffered = tc .newConverter(writerSchema) @@ -109,10 +115,10 @@ object ParquetField { } } else { new Record[T] { - override def buildSchema(cm: CaseMapper, conf: Configuration): Type = + override def buildSchema(cm: CaseMapper, groupArrayFields: Boolean): Type = caseClass.parameters .foldLeft(Types.requiredGroup()) { (g, p) => - g.addField(Schema.rename(p.typeclass.schema(cm, conf), cm.map(p.label))) + g.addField(Schema.rename(p.typeclass.schema(cm, groupArrayFields), cm.map(p.label))) } .named(caseClass.typeName.full) @@ -139,13 +145,16 @@ object ParquetField { s"Type ${caseClass.typeName}" ) - override def write(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit = { + override def write( + c: RecordConsumer, + v: T + )(cm: CaseMapper, groupArrayFields: Boolean): Unit = { caseClass.parameters.foreach { p => val x = p.dereference(v) if (p.typeclass.nonEmpty(x)) { val name = cm.map(p.label) c.startField(name, p.index) - p.typeclass.writeGroup(c, x, conf)(cm) + p.typeclass.writeGroup(c, x)(cm, groupArrayFields) c.endField(name, p.index) } } @@ -202,9 +211,13 @@ object ParquetField { class FromWord[T] { def apply[U](f: T => U)(g: U => T)(implicit pf: Primitive[T]): Primitive[U] = new Primitive[U] { - override def buildSchema(cm: CaseMapper, conf: Configuration): Type = pf.schema(cm, conf) - override def write(c: RecordConsumer, v: U, conf: Configuration)(cm: CaseMapper): Unit = - pf.write(c, g(v), conf)(cm) + override def buildSchema(cm: CaseMapper, groupArrayFields: Boolean): Type = + pf.schema(cm, groupArrayFields) + override def write(c: RecordConsumer, v: U)( + cm: CaseMapper, + groupArrayFields: Boolean + ): Unit = + pf.write(c, g(v))(cm, groupArrayFields) override def newConverter(writerSchema: Type): TypeConverter[U] = pf.newConverter(writerSchema).asInstanceOf[TypeConverter.Primitive[T]].map(f) @@ -228,9 +241,9 @@ object ParquetField { lta: => LogicalTypeAnnotation = null ): Primitive[T] = new Primitive[T] { - override def buildSchema(cm: CaseMapper, conf: Configuration): Type = + override def buildSchema(cm: CaseMapper, groupArrayFields: Boolean): Type = Schema.primitive(ptn, lta) - override def write(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit = + override def write(c: RecordConsumer, v: T)(cm: CaseMapper, groupArrayFields: Boolean): Unit = f(c)(v) override def newConverter(writerSchema: Type): TypeConverter[T] = g override type ParquetT = UnderlyingT @@ -297,18 +310,19 @@ object ParquetField { implicit def pfOption[T](implicit t: ParquetField[T]): ParquetField[Option[T]] = new ParquetField[Option[T]] { - override def buildSchema(cm: CaseMapper, conf: Configuration): Type = - Schema.setRepetition(t.schema(cm, conf), Repetition.OPTIONAL) + override def buildSchema(cm: CaseMapper, groupArrayFields: Boolean): Type = + Schema.setRepetition(t.schema(cm, groupArrayFields), Repetition.OPTIONAL) override protected def isEmpty(v: Option[T]): Boolean = v.forall(t.isEmpty) override def fieldDocs(cm: CaseMapper): Map[String, String] = t.fieldDocs(cm) override val typeDoc: Option[String] = None - override def write(c: RecordConsumer, v: Option[T], conf: Configuration)( - cm: CaseMapper + override def write(c: RecordConsumer, v: Option[T])( + cm: CaseMapper, + groupArrayFields: Boolean ): Unit = - v.foreach(t.writeGroup(c, _, conf)(cm)) + v.foreach(t.writeGroup(c, _)(cm, groupArrayFields)) override def newConverter(writerSchema: Type): TypeConverter[Option[T]] = { val buffered = t @@ -332,14 +346,15 @@ object ParquetField { // Legacy compat with Magnolify <= 0.7; future versions will remove AvroCompat in favor of // Configuration-based approach @nowarn("cat=deprecation") - val groupAvroArrays: Boolean = pa match { + val avroCompatImported: Boolean = pa match { case ParquetArray.default => false case ParquetArray.AvroCompat.avroCompat => true } - override def buildSchema(cm: CaseMapper, conf: Configuration): Type = { - val repeatedSchema = Schema.setRepetition(t.schema(cm, conf), Repetition.REPEATED) - if (isGroup(conf)) { + override def buildSchema(cm: CaseMapper, groupArrayFields: Boolean): Type = { + val repeatedSchema = + Schema.setRepetition(t.schema(cm, groupArrayFields), Repetition.REPEATED) + if (isGroup(groupArrayFields)) { Types .requiredGroup() .addField(Schema.rename(repeatedSchema, AvroArrayField)) @@ -350,21 +365,21 @@ object ParquetField { } } - override protected def isGroup(conf: Configuration): Boolean = - groupAvroArrays || conf.getBoolean( - MagnolifyParquetProperties.WriteGroupedArrays, - MagnolifyParquetProperties.WriteGroupedArraysDefault - ) + override protected def isGroup(groupArrayFields: Boolean): Boolean = + avroCompatImported || groupArrayFields override protected def isEmpty(v: C[T]): Boolean = v.forall(t.isEmpty) - override def write(c: RecordConsumer, v: C[T], conf: Configuration)(cm: CaseMapper): Unit = - if (isGroup(conf)) { + override def write( + c: RecordConsumer, + v: C[T] + )(cm: CaseMapper, groupArrayFields: Boolean): Unit = + if (isGroup(groupArrayFields)) { c.startField(AvroArrayField, 0) - v.foreach(t.writeGroup(c, _, conf)(cm)) + v.foreach(t.writeGroup(c, _)(cm, groupArrayFields)) c.endField(AvroArrayField, 0) } else { - v.foreach(t.writeGroup(c, _, conf)(cm)) + v.foreach(t.writeGroup(c, _)(cm, groupArrayFields)) } override def newConverter(writerSchema: Type): TypeConverter[C[T]] = { @@ -405,10 +420,10 @@ object ParquetField { pfValue: ParquetField[V] ): ParquetField[Map[K, V]] = { new ParquetField[Map[K, V]] { - override def buildSchema(cm: CaseMapper, conf: Configuration): Type = { - val keySchema = Schema.rename(pfKey.schema(cm, conf), KeyField) + override def buildSchema(cm: CaseMapper, groupArrayFields: Boolean): Type = { + val keySchema = Schema.rename(pfKey.schema(cm, groupArrayFields), KeyField) require(keySchema.isRepetition(Repetition.REQUIRED), "Map key must be required") - val valueSchema = Schema.rename(pfValue.schema(cm, conf), ValueField) + val valueSchema = Schema.rename(pfValue.schema(cm, groupArrayFields), ValueField) val keyValue = Types .repeatedGroup() .addField(keySchema) @@ -427,8 +442,9 @@ object ParquetField { override val typeDoc: Option[String] = None - override def write(c: RecordConsumer, v: Map[K, V], conf: Configuration)( - cm: CaseMapper + override def write(c: RecordConsumer, v: Map[K, V])( + cm: CaseMapper, + groupArrayFields: Boolean ): Unit = { if (v.nonEmpty) { c.startGroup() @@ -436,11 +452,11 @@ object ParquetField { v.foreach { case (k, v) => c.startGroup() c.startField(KeyField, 0) - pfKey.writeGroup(c, k, conf)(cm) + pfKey.writeGroup(c, k)(cm, groupArrayFields) c.endField(KeyField, 0) if (pfValue.nonEmpty(v)) { c.startField(ValueField, 1) - pfValue.writeGroup(c, v, conf)(cm) + pfValue.writeGroup(c, v)(cm, groupArrayFields) c.endField(ValueField, 1) } c.endGroup() @@ -493,10 +509,10 @@ object ParquetField { class LogicalTypeWord[T](lta: => LogicalTypeAnnotation) extends Serializable { def apply[U](f: T => U)(g: U => T)(implicit pf: Primitive[T]): Primitive[U] = new Primitive[U] { - override def buildSchema(cm: CaseMapper, conf: Configuration): Type = - Schema.setLogicalType(pf.schema(cm, conf), lta) - override def write(c: RecordConsumer, v: U, conf: Configuration)(cm: CaseMapper): Unit = - pf.write(c, g(v), conf)(cm) + override def buildSchema(cm: CaseMapper, groupArrayFields: Boolean): Type = + Schema.setLogicalType(pf.schema(cm, groupArrayFields), lta) + override def write(c: RecordConsumer, v: U)(cm: CaseMapper, groupArrayFields: Boolean): Unit = + pf.write(c, g(v))(cm, groupArrayFields) override def newConverter(writerSchema: Type): TypeConverter[U] = pf.newConverter(writerSchema).asInstanceOf[TypeConverter.Primitive[T]].map(f) @@ -530,15 +546,16 @@ object ParquetField { ) new Primitive[BigDecimal] { - override def buildSchema(cm: CaseMapper, conf: Configuration): Type = + override def buildSchema(cm: CaseMapper, groupArrayFields: Boolean): Type = Schema.primitive( PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, LogicalTypeAnnotation.decimalType(scale, precision), length ) - override def write(c: RecordConsumer, v: BigDecimal, conf: Configuration)( - cm: CaseMapper + override def write(c: RecordConsumer, v: BigDecimal)( + cm: CaseMapper, + groupArrayFields: Boolean ): Unit = c.addBinary(Binary.fromConstantByteArray(Decimal.toFixed(v, precision, scale, length))) @@ -563,10 +580,13 @@ object ParquetField { logicalType[String](LogicalTypeAnnotation.enumType())(et.from)(et.to) implicit val ptUuid: Primitive[UUID] = new Primitive[UUID] { - override def buildSchema(cm: CaseMapper, conf: Configuration): Type = + override def buildSchema(cm: CaseMapper, groupArrayFields: Boolean): Type = Schema.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, length = 16) - override def write(c: RecordConsumer, v: UUID, conf: Configuration)(cm: CaseMapper): Unit = + override def write( + c: RecordConsumer, + v: UUID + )(cm: CaseMapper, groupArrayFields: Boolean): Unit = c.addBinary( Binary.fromConstantByteArray( ByteBuffer diff --git a/parquet/src/main/scala/magnolify/parquet/ParquetType.scala b/parquet/src/main/scala/magnolify/parquet/ParquetType.scala index b2b179dfb..c2a8dde8e 100644 --- a/parquet/src/main/scala/magnolify/parquet/ParquetType.scala +++ b/parquet/src/main/scala/magnolify/parquet/ParquetType.scala @@ -60,11 +60,8 @@ object ParquetArray { sealed trait ParquetType[T] extends Serializable { import ParquetType._ - def schema: MessageType = schema(new Configuration()) - def schema(conf: Configuration): MessageType - - def avroSchema: AvroSchema = avroSchema(new Configuration()) - def avroSchema(conf: Configuration): AvroSchema + def schema: MessageType + def avroSchema: AvroSchema def setupInput(job: Job): Unit = { job.setInputFormatClass(classOf[ParquetInputFormat[T]]) @@ -84,7 +81,9 @@ sealed trait ParquetType[T] extends Serializable { def readBuilder(file: InputFile): ReadBuilder[T] = new ReadBuilder(file, readSupport) def writeBuilder(file: OutputFile): WriteBuilder[T] = new WriteBuilder(file, writeSupport) - private[parquet] def write(c: RecordConsumer, v: T, conf: Configuration): Unit = () + protected def groupArrayFields: Boolean = MagnolifyParquetProperties.WriteGroupedArraysDefault + + private[parquet] def write(c: RecordConsumer, v: T): Unit = () private[parquet] def newConverter(writerSchema: Type): TypeConverter[T] = null } @@ -92,26 +91,48 @@ object ParquetType { private val logger = LoggerFactory.getLogger(this.getClass) implicit def apply[T](implicit f: ParquetField[T], pa: ParquetArray): ParquetType[T] = - ParquetType(CaseMapper.identity) + ParquetType(CaseMapper.identity, new Configuration()) def apply[T]( cm: CaseMapper + )(implicit f: ParquetField[T], pa: ParquetArray): ParquetType[T] = + ParquetType[T](cm, new Configuration())(f, pa) + + def apply[T]( + conf: Configuration + )(implicit f: ParquetField[T], pa: ParquetArray): ParquetType[T] = + ParquetType[T](CaseMapper.identity, conf)(f, pa) + + def apply[T]( + cm: CaseMapper, + conf: Configuration )(implicit f: ParquetField[T], pa: ParquetArray): ParquetType[T] = f match { case r: ParquetField.Record[_] => new ParquetType[T] { - @transient override def schema(conf: Configuration): MessageType = - Schema.message(r.schema(cm, conf)) - @transient override def avroSchema(conf: Configuration): AvroSchema = { - val s = new AvroSchemaConverter().convert(schema(conf)) + @transient override def schema: MessageType = + Schema.message(r.schema(cm, groupArrayFields)) + + @transient override def avroSchema: AvroSchema = { + val s = new AvroSchemaConverter().convert(schema) // add doc to avro schema val fieldDocs = f.fieldDocs(cm) SchemaUtil.deepCopy(s, f.typeDoc, fieldDocs.get) } - override def write(c: RecordConsumer, v: T, conf: Configuration): Unit = - r.write(c, v, conf)(cm) + override def write(c: RecordConsumer, v: T): Unit = + r.write(c, v)(cm, groupArrayFields) override private[parquet] def newConverter(writerSchema: Type): TypeConverter[T] = r.newConverter(writerSchema) + + @nowarn("cat=deprecation") + override protected val groupArrayFields: Boolean = pa match { + case ParquetArray.AvroCompat.avroCompat => true + case ParquetArray.default => + conf.getBoolean( + MagnolifyParquetProperties.WriteGroupedArrays, + MagnolifyParquetProperties.WriteGroupedArraysDefault + ) + } } case _ => throw new IllegalArgumentException(s"ParquetType can only be created from Record. Got $f") @@ -143,9 +164,7 @@ object ParquetType { } val requestedSchema = { - val s = Schema.message(parquetType.schema(context.getConfiguration)): @nowarn( - "cat=deprecation" - ) + val s = Schema.message(parquetType.schema) // If reading Avro, roundtrip schema using parquet-avro converter to ensure array compatibility; // magnolify-parquet does not automatically wrap repeated fields into a group like parquet-avro does if (Schema.hasGroupedArray(context.getFileSchema)) { @@ -178,14 +197,13 @@ object ParquetType { override def getName: String = "magnolify" private var recordConsumer: RecordConsumer = null - private var conf: Configuration = null override def init(configuration: Configuration): hadoop.WriteSupport.WriteContext = { if (parquetType == null) { parquetType = SerializationUtils.fromBase64[ParquetType[T]](configuration.get(WriteTypeKey)) } - val schema = Schema.message(parquetType.schema(configuration)) + val schema = Schema.message(parquetType.schema) val metadata = new java.util.HashMap[String, String]() if ( @@ -197,7 +215,7 @@ object ParquetType { try { metadata.put( AVRO_SCHEMA_METADATA_KEY, - parquetType.avroSchema(configuration).toString() + parquetType.avroSchema.toString() ) } catch { // parquet-avro has greater schema restrictions than magnolify-parquet, e.g., parquet-avro does not @@ -211,7 +229,6 @@ object ParquetType { } } - this.conf = configuration new hadoop.WriteSupport.WriteContext(schema, metadata) } @@ -220,7 +237,7 @@ object ParquetType { override def write(record: T): Unit = { recordConsumer.startMessage() - parquetType.write(recordConsumer, record, conf) + parquetType.write(recordConsumer, record) recordConsumer.endMessage() } } diff --git a/parquet/src/main/scala/magnolify/parquet/Predicate.scala b/parquet/src/main/scala/magnolify/parquet/Predicate.scala index e902560d4..50581fd05 100644 --- a/parquet/src/main/scala/magnolify/parquet/Predicate.scala +++ b/parquet/src/main/scala/magnolify/parquet/Predicate.scala @@ -17,7 +17,6 @@ package magnolify.parquet import magnolify.shared.CaseMapper -import org.apache.hadoop.conf.Configuration import org.apache.parquet.filter2.predicate.Operators.Column import org.apache.parquet.filter2.predicate.{ FilterApi, @@ -54,7 +53,9 @@ object Predicate { pf: ParquetField.Primitive[ScalaFieldT] ): FilterPredicate = { val fieldType = - pf.schema(CaseMapper.identity, new Configuration()).asPrimitiveType().getPrimitiveTypeName + pf.schema(CaseMapper.identity, MagnolifyParquetProperties.WriteGroupedArraysDefault) + .asPrimitiveType() + .getPrimitiveTypeName val column = fieldType match { case PrimitiveTypeName.INT32 => FilterApi.intColumn(fieldName) @@ -67,7 +68,9 @@ object Predicate { } def wrap[T](addFn: (PrimitiveConverter, T) => Unit): T => ScalaFieldT = { - lazy val converter = pf.newConverter(pf.schema(CaseMapper.identity, new Configuration())) + lazy val converter = pf.newConverter( + pf.schema(CaseMapper.identity, MagnolifyParquetProperties.WriteGroupedArraysDefault) + ) value => { addFn(converter.asPrimitiveConverter(), value) converter.get diff --git a/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala b/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala index 593155919..2d610aac8 100644 --- a/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala +++ b/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala @@ -58,7 +58,6 @@ class ParquetTypeSuite extends MagnolifySuite { val writer = tpe.writeBuilder(out).build() writer.write(t) writer.close() - val in = new TestInputFile(out.getBytes) val reader = tpe.readBuilder(in).build() val copy = reader.read() @@ -198,7 +197,13 @@ class ParquetTypeSuite extends MagnolifySuite { } test(s"AvroCompat") { - val pt = ParquetType[WithList] + def conf(writeGroupedArrays: Boolean): Configuration = { + val c = new Configuration(); + c.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, writeGroupedArrays) + c + } + + val ptNonGroupedArrays = ParquetType[WithList](conf(writeGroupedArrays = false)) // Assert that by default, Magnolify doesn't wrap repeated fields in group types val nonAvroCompliantSchema = """|message magnolify.parquet.WithList { | required binary s (STRING); @@ -206,17 +211,17 @@ class ParquetTypeSuite extends MagnolifySuite { |} |""".stripMargin - assert(!Schema.hasGroupedArray(pt.schema)) - assertEquals(nonAvroCompliantSchema, pt.schema.toString) + assert(!Schema.hasGroupedArray(ptNonGroupedArrays.schema)) + assertEquals(nonAvroCompliantSchema, ptNonGroupedArrays.schema.toString) // Assert that ReadSupport converts non-grouped arrays to grouped arrays depending on writer schema val asc = new AvroSchemaConverter() - val readSupport = pt.readSupport.init( + val readSupport = ptNonGroupedArrays.readSupport.init( new hadoop.InitContext( new Configuration(), Map(ParquetWriter.OBJECT_MODEL_NAME_PROP -> Set("avro").asJava).asJava, asc.convert( - asc.convert(pt.schema) + asc.convert(ptNonGroupedArrays.schema) ) // Use converted Avro-compliant schema, which groups lists ) ) @@ -232,23 +237,16 @@ class ParquetTypeSuite extends MagnolifySuite { assert(Schema.hasGroupedArray(readSupport.getRequestedSchema)) assertEquals(avroCompliantSchema, readSupport.getRequestedSchema.toString) - // Assert that WriteSupport uses grouped schema when explicitly configured - { - val conf = new Configuration() - conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, true) - val writeContext = pt.writeSupport.init(conf) - assertEquals(avroCompliantSchema, writeContext.getSchema.toString) - assertEquals(pt.schema(conf), writeContext.getSchema) - } - // Assert that WriteSupport uses non-grouped schema otherwise - { - val conf = new Configuration() - conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, false) - val writeContext = pt.writeSupport.init(conf) - assertEquals(nonAvroCompliantSchema, writeContext.getSchema.toString) - assertEquals(pt.schema(conf), writeContext.getSchema) - } + val wc1 = ptNonGroupedArrays.writeSupport.init(new Configuration()) + assertEquals(nonAvroCompliantSchema, wc1.getSchema.toString) + assertEquals(ptNonGroupedArrays.schema, wc1.getSchema) + + // Assert that WriteSupport uses grouped schema when explicitly configured + val ptGroupedArrays = ParquetType[WithList](conf(writeGroupedArrays = true)) + val wc2 = ptGroupedArrays.writeSupport.init(new Configuration()) + assertEquals(avroCompliantSchema, wc2.getSchema.toString) + assertEquals(ptGroupedArrays.schema, wc2.getSchema) } } From da8fb9138c765b8272fc23be629fa01007d7eca2 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 17 Sep 2024 16:17:16 -0400 Subject: [PATCH 10/12] fixup --- jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala index 89d0a20d7..159c49d31 100644 --- a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala +++ b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala @@ -16,7 +16,6 @@ package magnolify.jmh -import magnolify.jmh.MagnolifyBench.nested import magnolify.parquet.{MagnolifyParquetProperties, ParquetType, TestInputFile, TestOutputFile} import java.util.concurrent.TimeUnit @@ -102,7 +101,7 @@ class ParquetReadState(pt: ParquetType[Nested]) { def setup(): Unit = { out = new TestOutputFile val writer = pt.writeBuilder(out).build() - writer.write(nested) + writer.write(MagnolifyBench.nested) writer.close() val in = new TestInputFile(out.getBytes) From 23bcbd964f225b9959e8570789904d51d68a4eea Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Wed, 18 Sep 2024 12:24:41 -0400 Subject: [PATCH 11/12] Make Parquet benchmark more granular --- build.sbt | 2 +- .../scala/magnolify/jmh/MagnolifyBench.scala | 72 +++++++++++------ .../jmh/ParquetInMemoryPageStore.scala | 77 +++++++++++++++++++ 3 files changed, 127 insertions(+), 24 deletions(-) create mode 100644 jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala diff --git a/build.sbt b/build.sbt index 3d9ec239c..2b29ff4bb 100644 --- a/build.sbt +++ b/build.sbt @@ -708,7 +708,7 @@ lazy val jmh: Project = project cats % Test, datastore % Test, guava % Test, - parquet % "test->test", + parquet % Test, protobuf % "test->test", scalacheck % Test, tensorflow % Test, diff --git a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala index 159c49d31..9935027c9 100644 --- a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala +++ b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala @@ -16,13 +16,13 @@ package magnolify.jmh -import magnolify.parquet.{MagnolifyParquetProperties, ParquetType, TestInputFile, TestOutputFile} +import magnolify.parquet.ParquetType.WriteSupport +import magnolify.parquet.{MagnolifyParquetProperties, ParquetType} import java.util.concurrent.TimeUnit import magnolify.scalacheck.auto._ import magnolify.test.Simple._ import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter} import org.scalacheck._ import org.openjdk.jmh.annotations._ @@ -94,39 +94,65 @@ class AvroBench { @State(Scope.Benchmark) class ParquetReadState(pt: ParquetType[Nested]) { - var out: TestOutputFile = null - var reader: ParquetReader[Nested] = null + import org.apache.parquet.io._ + import org.apache.parquet.column.impl.ColumnWriteStoreV1 + import org.apache.parquet.column.ParquetProperties + import org.apache.parquet.hadoop.api.InitContext + + var reader: RecordReader[Nested] = null @Setup(Level.Invocation) def setup(): Unit = { - out = new TestOutputFile - val writer = pt.writeBuilder(out).build() - writer.write(MagnolifyBench.nested) - writer.close() - - val in = new TestInputFile(out.getBytes) - reader = pt.readBuilder(in).build() - } + // Write page + val columnIO = new ColumnIOFactory(true).getColumnIO(pt.schema) + val memPageStore = new ParquetInMemoryPageStore(1) + val columns = new ColumnWriteStoreV1( + pt.schema, + memPageStore, + ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build + ) + val writeSupport = pt.writeSupport + val recordWriter = columnIO.getRecordWriter(columns) + writeSupport.prepareForWrite(recordWriter) + writeSupport.write(MagnolifyBench.nested) + recordWriter.flush() + columns.flush() - @TearDown(Level.Invocation) - def tearDown(): Unit = { - reader.close() + // Read and convert page + val conf = new Configuration() + val readSupport = pt.readSupport + reader = columnIO.getRecordReader( + memPageStore, + readSupport.prepareForRead( + conf, + new java.util.HashMap, + pt.schema, + readSupport.init(new InitContext(conf, new java.util.HashMap, pt.schema))) + ) } } @State(Scope.Benchmark) class ParquetWriteState(pt: ParquetType[Nested]) { - var writer: ParquetWriter[Nested] = null + import org.apache.parquet.io._ + import org.apache.parquet.column.impl.ColumnWriteStoreV1 + import org.apache.parquet.column.ParquetProperties + + var writer: WriteSupport[Nested] = null @Setup(Level.Invocation) def setup(): Unit = { - val out = new TestOutputFile - writer = pt.writeBuilder(out).build() - } - - @TearDown(Level.Invocation) - def tearDown(): Unit = { - writer.close() + val columnIO = new ColumnIOFactory(true).getColumnIO(pt.schema) + val memPageStore = new ParquetInMemoryPageStore(1) + val columns = new ColumnWriteStoreV1( + pt.schema, + memPageStore, + ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build + ) + val writeSupport = pt.writeSupport + val recordWriter = columnIO.getRecordWriter(columns) + writeSupport.prepareForWrite(recordWriter) + this.writer = writeSupport } } diff --git a/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala b/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala new file mode 100644 index 000000000..5bb596d4b --- /dev/null +++ b/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala @@ -0,0 +1,77 @@ +package magnolify.jmh + +import org.apache.parquet.bytes.{ByteBufferReleaser, BytesInput, HeapByteBufferAllocator} +import org.apache.parquet.column.{ColumnDescriptor, Encoding} +import org.apache.parquet.column.page._ +import org.apache.parquet.column.statistics._ + +import scala.collection.mutable + +/** + * An in-memory Parquet page store modeled after parquet-java's MemPageStore, used to benchmark + * ParquetType conversion between Parquet Groups and Scala case classes + */ +class ParquetInMemoryPageStore(rowCount: Long) extends PageReadStore with PageWriteStore { + lazy val writers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryWriter]() + lazy val readers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryReader]() + + override def getPageReader(path: ColumnDescriptor): PageReader = + readers.getOrElseUpdate(path, { + val writer = writers(path) + new ParquetInMemoryReader(writer.numValues, writer.pages.toList, writer.dictionaryPage) + }) + + override def getPageWriter(path: ColumnDescriptor): PageWriter = + writers.getOrElseUpdate(path, new ParquetInMemoryWriter()) + + override def getRowCount: Long = rowCount +} + +class ParquetInMemoryReader(valueCount: Long, pages: List[DataPage], dictionaryPage: DictionaryPage) extends PageReader { + lazy val pagesIt = pages.iterator + override def readDictionaryPage(): DictionaryPage = dictionaryPage + override def getTotalValueCount: Long = valueCount + override def readPage(): DataPage = pagesIt.next() +} + +class ParquetInMemoryWriter extends PageWriter { + var numRows = 0 + var numValues: Long = 0 + var memSize: Long = 0 + val pages = new mutable.ListBuffer[DataPage]() + var dictionaryPage: DictionaryPage = null + + override def writePage(bytesInput: BytesInput, valueCount: Int, statistics: Statistics[_], rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = { + writePage(bytesInput, valueCount, 1, statistics, rlEncoding, dlEncoding, valuesEncoding) + } + + override def writePage(bytesInput: BytesInput, valueCount: Int, rowCount: Int, statistics: Statistics[_], sizeStatistics: SizeStatistics, rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = { + writePage(bytesInput, valueCount, rowCount, statistics, rlEncoding, dlEncoding, valuesEncoding) + } + + override def writePage(bytesInput: BytesInput, valueCount: Int, rowCount: Int, statistics: Statistics[_], rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = { + pages.addOne(new DataPageV1( + bytesInput.copy(new ByteBufferReleaser(new HeapByteBufferAllocator)), + valueCount, + bytesInput.size().toInt, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding)) + memSize += bytesInput.size() + numRows += rowCount + numValues += valueCount + } + + override def writePageV2(rowCount: Int, nullCount: Int, valueCount: Int, repetitionLevels: BytesInput, definitionLevels: BytesInput, dataEncoding: Encoding, data: BytesInput, statistics: Statistics[_]): Unit = ??? + + override def getMemSize: Long = memSize + + override def allocatedSize(): Long = memSize + + override def writeDictionaryPage(dictionaryPage: DictionaryPage): Unit = { + this.dictionaryPage = dictionaryPage + } + + override def memUsageString(prefix: String): String = s"$prefix $memSize bytes" +} From eb205db5b405a4f57cdd204c802d6c6efbfe4a5d Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 19 Sep 2024 15:43:56 -0400 Subject: [PATCH 12/12] Cleanup --- .../scala/magnolify/jmh/MagnolifyBench.scala | 106 +----------------- .../jmh/ParquetInMemoryPageStore.scala | 77 ------------- .../parquet/MagnolifyParquetProperties.scala | 7 -- 3 files changed, 2 insertions(+), 188 deletions(-) delete mode 100644 jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala diff --git a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala index 9935027c9..ccf745160 100644 --- a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala +++ b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala @@ -16,18 +16,13 @@ package magnolify.jmh -import magnolify.parquet.ParquetType.WriteSupport -import magnolify.parquet.{MagnolifyParquetProperties, ParquetType} - import java.util.concurrent.TimeUnit + import magnolify.scalacheck.auto._ import magnolify.test.Simple._ -import org.apache.hadoop.conf.Configuration import org.scalacheck._ import org.openjdk.jmh.annotations._ -import scala.jdk.CollectionConverters._ - object MagnolifyBench { val seed: rng.Seed = rng.Seed(0) val prms: Gen.Parameters = Gen.Parameters.default @@ -92,103 +87,6 @@ class AvroBench { @Benchmark def avroSchema: Schema = AvroType[Nested].schema } -@State(Scope.Benchmark) -class ParquetReadState(pt: ParquetType[Nested]) { - import org.apache.parquet.io._ - import org.apache.parquet.column.impl.ColumnWriteStoreV1 - import org.apache.parquet.column.ParquetProperties - import org.apache.parquet.hadoop.api.InitContext - - var reader: RecordReader[Nested] = null - - @Setup(Level.Invocation) - def setup(): Unit = { - // Write page - val columnIO = new ColumnIOFactory(true).getColumnIO(pt.schema) - val memPageStore = new ParquetInMemoryPageStore(1) - val columns = new ColumnWriteStoreV1( - pt.schema, - memPageStore, - ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build - ) - val writeSupport = pt.writeSupport - val recordWriter = columnIO.getRecordWriter(columns) - writeSupport.prepareForWrite(recordWriter) - writeSupport.write(MagnolifyBench.nested) - recordWriter.flush() - columns.flush() - - // Read and convert page - val conf = new Configuration() - val readSupport = pt.readSupport - reader = columnIO.getRecordReader( - memPageStore, - readSupport.prepareForRead( - conf, - new java.util.HashMap, - pt.schema, - readSupport.init(new InitContext(conf, new java.util.HashMap, pt.schema))) - ) - } -} - -@State(Scope.Benchmark) -class ParquetWriteState(pt: ParquetType[Nested]) { - import org.apache.parquet.io._ - import org.apache.parquet.column.impl.ColumnWriteStoreV1 - import org.apache.parquet.column.ParquetProperties - - var writer: WriteSupport[Nested] = null - - @Setup(Level.Invocation) - def setup(): Unit = { - val columnIO = new ColumnIOFactory(true).getColumnIO(pt.schema) - val memPageStore = new ParquetInMemoryPageStore(1) - val columns = new ColumnWriteStoreV1( - pt.schema, - memPageStore, - ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build - ) - val writeSupport = pt.writeSupport - val recordWriter = columnIO.getRecordWriter(columns) - writeSupport.prepareForWrite(recordWriter) - this.writer = writeSupport - } -} - -object ParquetStates { - def confWithGroupedArraysProp(propValue: Boolean): Configuration = { - val conf = new Configuration() - conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, propValue) - conf - } - class DefaultParquetReadState extends ParquetReadState(ParquetType[Nested](confWithGroupedArraysProp(false))) - class DefaultParquetWriteState extends ParquetWriteState(ParquetType[Nested](confWithGroupedArraysProp(false))) - - class ParquetAvroCompatReadState extends ParquetReadState(ParquetType[Nested](confWithGroupedArraysProp(true))) - class ParquetAvroCompatWriteState extends ParquetWriteState(ParquetType[Nested](confWithGroupedArraysProp(true))) -} - -@BenchmarkMode(Array(Mode.AverageTime)) -@OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Thread) -class ParquetBench { - import MagnolifyBench._ - - @Benchmark def parquetWrite(state: ParquetStates.DefaultParquetWriteState): Unit = state.writer.write(nested) - @Benchmark def parquetRead(state: ParquetStates.DefaultParquetReadState): Nested = state.reader.read() -} - -@BenchmarkMode(Array(Mode.AverageTime)) -@OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Thread) -class ParquetAvroCompatBench { - import MagnolifyBench._ - - @Benchmark def parquetWrite(state: ParquetStates.ParquetAvroCompatWriteState): Unit = state.writer.write(nested) - @Benchmark def parquetRead(state: ParquetStates.ParquetAvroCompatReadState): Nested = state.reader.read() -} - @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.NANOSECONDS) @State(Scope.Thread) @@ -259,7 +157,7 @@ class ExampleBench { private val exampleNested = implicitly[Arbitrary[ExampleNested]].arbitrary(prms, seed).get private val example = exampleType.to(exampleNested).build() @Benchmark def exampleTo: Example.Builder = exampleType.to(exampleNested) - @Benchmark def exampleFrom: ExampleNested = exampleType.from(example.getFeatures.getFeatureMap.asScala.toMap) + @Benchmark def exampleFrom: ExampleNested = exampleType.from(example) } // Collections are not supported diff --git a/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala b/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala deleted file mode 100644 index 5bb596d4b..000000000 --- a/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala +++ /dev/null @@ -1,77 +0,0 @@ -package magnolify.jmh - -import org.apache.parquet.bytes.{ByteBufferReleaser, BytesInput, HeapByteBufferAllocator} -import org.apache.parquet.column.{ColumnDescriptor, Encoding} -import org.apache.parquet.column.page._ -import org.apache.parquet.column.statistics._ - -import scala.collection.mutable - -/** - * An in-memory Parquet page store modeled after parquet-java's MemPageStore, used to benchmark - * ParquetType conversion between Parquet Groups and Scala case classes - */ -class ParquetInMemoryPageStore(rowCount: Long) extends PageReadStore with PageWriteStore { - lazy val writers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryWriter]() - lazy val readers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryReader]() - - override def getPageReader(path: ColumnDescriptor): PageReader = - readers.getOrElseUpdate(path, { - val writer = writers(path) - new ParquetInMemoryReader(writer.numValues, writer.pages.toList, writer.dictionaryPage) - }) - - override def getPageWriter(path: ColumnDescriptor): PageWriter = - writers.getOrElseUpdate(path, new ParquetInMemoryWriter()) - - override def getRowCount: Long = rowCount -} - -class ParquetInMemoryReader(valueCount: Long, pages: List[DataPage], dictionaryPage: DictionaryPage) extends PageReader { - lazy val pagesIt = pages.iterator - override def readDictionaryPage(): DictionaryPage = dictionaryPage - override def getTotalValueCount: Long = valueCount - override def readPage(): DataPage = pagesIt.next() -} - -class ParquetInMemoryWriter extends PageWriter { - var numRows = 0 - var numValues: Long = 0 - var memSize: Long = 0 - val pages = new mutable.ListBuffer[DataPage]() - var dictionaryPage: DictionaryPage = null - - override def writePage(bytesInput: BytesInput, valueCount: Int, statistics: Statistics[_], rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = { - writePage(bytesInput, valueCount, 1, statistics, rlEncoding, dlEncoding, valuesEncoding) - } - - override def writePage(bytesInput: BytesInput, valueCount: Int, rowCount: Int, statistics: Statistics[_], sizeStatistics: SizeStatistics, rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = { - writePage(bytesInput, valueCount, rowCount, statistics, rlEncoding, dlEncoding, valuesEncoding) - } - - override def writePage(bytesInput: BytesInput, valueCount: Int, rowCount: Int, statistics: Statistics[_], rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = { - pages.addOne(new DataPageV1( - bytesInput.copy(new ByteBufferReleaser(new HeapByteBufferAllocator)), - valueCount, - bytesInput.size().toInt, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding)) - memSize += bytesInput.size() - numRows += rowCount - numValues += valueCount - } - - override def writePageV2(rowCount: Int, nullCount: Int, valueCount: Int, repetitionLevels: BytesInput, definitionLevels: BytesInput, dataEncoding: Encoding, data: BytesInput, statistics: Statistics[_]): Unit = ??? - - override def getMemSize: Long = memSize - - override def allocatedSize(): Long = memSize - - override def writeDictionaryPage(dictionaryPage: DictionaryPage): Unit = { - this.dictionaryPage = dictionaryPage - } - - override def memUsageString(prefix: String): String = s"$prefix $memSize bytes" -} diff --git a/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala b/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala index 34a51a91d..a5123a47b 100644 --- a/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala +++ b/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala @@ -31,11 +31,4 @@ object MagnolifyParquetProperties { val ReadTypeKey = "parquet.type.read.type" val WriteTypeKey = "parquet.type.write.type" - - // Hash any Configuration values that might affect schema creation to use as part of Schema cache key - private[parquet] def hashValues(conf: Configuration): Int = - Option(conf.get(WriteGroupedArrays)) - .map(_.toBoolean) - .getOrElse(WriteGroupedArraysDefault) - .hashCode() }