Skip to content

Commit

Permalink
[Very WIP] Make writes Configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Sep 12, 2024
1 parent a1ee149 commit 81e3f07
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 200 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package magnolify.parquet

import org.apache.hadoop.conf.Configuration

import java.util.Objects

object MagnolifyParquetProperties {
val WriteGroupedArrays: String = "magnolify.parquet.write-grouped-arrays"
val WriteGroupedArraysDefault: Boolean = false

val WriteAvroSchemaToMetadata: String = "magnolify.parquet.write-avro-schema"
val WriteAvroSchemaToMetadataDefault: Boolean = true

val ReadTypeKey = "parquet.type.read.type"
val WriteTypeKey = "parquet.type.write.type"

// Hash any Configuration values that might affect schema creation to use as part of Schema cache key
private[parquet] def hashValues(conf: Configuration): Int = {
Objects.hash(
Option(conf.get(WriteGroupedArrays)).map(_.toBoolean).getOrElse(WriteGroupedArraysDefault)
)
}
}
134 changes: 81 additions & 53 deletions parquet/src/main/scala/magnolify/parquet/ParquetField.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,50 +23,62 @@ import magnolify.shims.FactoryCompat
import java.nio.{ByteBuffer, ByteOrder}
import java.time.LocalDate
import java.util.UUID
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.io.ParquetDecodingException
import org.apache.parquet.io.api._
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{LogicalTypeAnnotation, Type, Types}
import org.typelevel.scalaccompat.annotation.nowarn

import scala.annotation.implicitNotFound
import scala.collection.concurrent
import scala.collection.compat._

sealed trait ParquetField[T] extends Serializable {

@transient private lazy val schemaCache: concurrent.Map[UUID, Type] =
@transient private lazy val schemaCache: concurrent.Map[Int, concurrent.Map[UUID, Type]] =
concurrent.TrieMap.empty

protected def buildSchema(cm: CaseMapper): Type
def schema(cm: CaseMapper): Type =
schemaCache.getOrElseUpdate(cm.uuid, buildSchema(cm))
protected def buildSchema(cm: CaseMapper, conf: Configuration): Type

def schema(cm: CaseMapper, conf: Configuration): Type = {
val confHash = MagnolifyParquetProperties.hashValues(conf)

if (!schemaCache.contains(confHash)) {
schemaCache.put(confHash, concurrent.TrieMap.empty)
}

schemaCache(confHash).getOrElseUpdate(
cm.uuid,
buildSchema(cm, Option(conf).getOrElse(new Configuration()))
)
}

val hasAvroArray: Boolean = false
def fieldDocs(cm: CaseMapper): Map[String, String]
def typeDoc: Option[String]

protected val isGroup: Boolean = false
protected def isGroup(conf: Configuration): Boolean = false
protected def isEmpty(v: T): Boolean
protected final def nonEmpty(v: T): Boolean = !isEmpty(v)

def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit
def write(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit
def newConverter(writerSchema: Type): TypeConverter[T]

protected def writeGroup(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = {
if (isGroup) {
protected def writeGroup(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit = {
val wrapGroup = isGroup(conf)
if (wrapGroup) {
c.startGroup()
}
write(c, v)(cm)
if (isGroup) {
write(c, v, conf)(cm)
if (wrapGroup) {
c.endGroup()
}
}
}

object ParquetField {
sealed trait Record[T] extends ParquetField[T] {
override protected val isGroup: Boolean = true
override protected def isGroup(conf: Configuration): Boolean = true

override protected def isEmpty(v: T): Boolean = false
}
Expand All @@ -79,10 +91,11 @@ object ParquetField {
val p = caseClass.parameters.head
val tc = p.typeclass
new ParquetField[T] {
override protected def buildSchema(cm: CaseMapper): Type = tc.buildSchema(cm)
override protected def buildSchema(cm: CaseMapper, conf: Configuration): Type =
tc.buildSchema(cm, conf)
override protected def isEmpty(v: T): Boolean = tc.isEmpty(p.dereference(v))
override def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit =
tc.writeGroup(c, p.dereference(v))(cm)
override def write(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit =
tc.writeGroup(c, p.dereference(v), conf)(cm)
override def newConverter(writerSchema: Type): TypeConverter[T] = {
val buffered = tc
.newConverter(writerSchema)
Expand All @@ -96,15 +109,13 @@ object ParquetField {
}
} else {
new Record[T] {
override def buildSchema(cm: CaseMapper): Type =
override def buildSchema(cm: CaseMapper, conf: Configuration): Type =
caseClass.parameters
.foldLeft(Types.requiredGroup()) { (g, p) =>
g.addField(Schema.rename(p.typeclass.schema(cm), cm.map(p.label)))
g.addField(Schema.rename(p.typeclass.schema(cm, conf), cm.map(p.label)))
}
.named(caseClass.typeName.full)

override val hasAvroArray: Boolean = caseClass.parameters.exists(_.typeclass.hasAvroArray)

override def fieldDocs(cm: CaseMapper): Map[String, String] =
caseClass.parameters.flatMap { param =>
val label = cm.map(param.label)
Expand All @@ -128,13 +139,13 @@ object ParquetField {
s"Type ${caseClass.typeName}"
)

override def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = {
override def write(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit = {
caseClass.parameters.foreach { p =>
val x = p.dereference(v)
if (p.typeclass.nonEmpty(x)) {
val name = cm.map(p.label)
c.startField(name, p.index)
p.typeclass.writeGroup(c, x)(cm)
p.typeclass.writeGroup(c, x, conf)(cm)
c.endField(name, p.index)
}
}
Expand Down Expand Up @@ -191,8 +202,9 @@ object ParquetField {
class FromWord[T] {
def apply[U](f: T => U)(g: U => T)(implicit pf: Primitive[T]): Primitive[U] =
new Primitive[U] {
override def buildSchema(cm: CaseMapper): Type = pf.schema(cm)
override def write(c: RecordConsumer, v: U)(cm: CaseMapper): Unit = pf.write(c, g(v))(cm)
override def buildSchema(cm: CaseMapper, conf: Configuration): Type = pf.schema(cm, conf)
override def write(c: RecordConsumer, v: U, conf: Configuration)(cm: CaseMapper): Unit =
pf.write(c, g(v), conf)(cm)
override def newConverter(writerSchema: Type): TypeConverter[U] =
pf.newConverter(writerSchema).asInstanceOf[TypeConverter.Primitive[T]].map(f)

Expand All @@ -216,8 +228,10 @@ object ParquetField {
lta: => LogicalTypeAnnotation = null
): Primitive[T] =
new Primitive[T] {
override def buildSchema(cm: CaseMapper): Type = Schema.primitive(ptn, lta)
override def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = f(c)(v)
override def buildSchema(cm: CaseMapper, conf: Configuration): Type =
Schema.primitive(ptn, lta)
override def write(c: RecordConsumer, v: T, conf: Configuration)(cm: CaseMapper): Unit =
f(c)(v)
override def newConverter(writerSchema: Type): TypeConverter[T] = g
override type ParquetT = UnderlyingT
}
Expand Down Expand Up @@ -283,16 +297,18 @@ object ParquetField {

implicit def pfOption[T](implicit t: ParquetField[T]): ParquetField[Option[T]] =
new ParquetField[Option[T]] {
override def buildSchema(cm: CaseMapper): Type =
Schema.setRepetition(t.schema(cm), Repetition.OPTIONAL)
override def buildSchema(cm: CaseMapper, conf: Configuration): Type =
Schema.setRepetition(t.schema(cm, conf), Repetition.OPTIONAL)
override protected def isEmpty(v: Option[T]): Boolean = v.forall(t.isEmpty)

override def fieldDocs(cm: CaseMapper): Map[String, String] = t.fieldDocs(cm)

override val typeDoc: Option[String] = None

override def write(c: RecordConsumer, v: Option[T])(cm: CaseMapper): Unit =
v.foreach(t.writeGroup(c, _)(cm))
override def write(c: RecordConsumer, v: Option[T], conf: Configuration)(
cm: CaseMapper
): Unit =
v.foreach(t.writeGroup(c, _, conf)(cm))

override def newConverter(writerSchema: Type): TypeConverter[Option[T]] = {
val buffered = t
Expand All @@ -313,14 +329,17 @@ object ParquetField {
pa: ParquetArray
): ParquetField[C[T]] = {
new ParquetField[C[T]] {
override val hasAvroArray: Boolean = pa match {
// Legacy compat with Magnolify <= 0.7; future versions will remove AvroCompat in favor of
// Configuration-based approach
@nowarn("cat=deprecation")
val groupAvroArrays: Boolean = pa match {
case ParquetArray.default => false
case ParquetArray.AvroCompat.avroCompat => true
}

override def buildSchema(cm: CaseMapper): Type = {
val repeatedSchema = Schema.setRepetition(t.schema(cm), Repetition.REPEATED)
if (hasAvroArray) {
override def buildSchema(cm: CaseMapper, conf: Configuration): Type = {
val repeatedSchema = Schema.setRepetition(t.schema(cm, conf), Repetition.REPEATED)
if (isGroup(conf)) {
Types
.requiredGroup()
.addField(Schema.rename(repeatedSchema, AvroArrayField))
Expand All @@ -331,16 +350,21 @@ object ParquetField {
}
}

override protected val isGroup: Boolean = hasAvroArray
override protected def isGroup(conf: Configuration): Boolean =
groupAvroArrays || conf.getBoolean(
MagnolifyParquetProperties.WriteGroupedArrays,
MagnolifyParquetProperties.WriteGroupedArraysDefault
)

override protected def isEmpty(v: C[T]): Boolean = v.forall(t.isEmpty)

override def write(c: RecordConsumer, v: C[T])(cm: CaseMapper): Unit =
if (hasAvroArray) {
override def write(c: RecordConsumer, v: C[T], conf: Configuration)(cm: CaseMapper): Unit =
if (isGroup(conf)) {
c.startField(AvroArrayField, 0)
v.foreach(t.writeGroup(c, _)(cm))
v.foreach(t.writeGroup(c, _, conf)(cm))
c.endField(AvroArrayField, 0)
} else {
v.foreach(t.writeGroup(c, _)(cm))
v.foreach(t.writeGroup(c, _, conf)(cm))
}

override def newConverter(writerSchema: Type): TypeConverter[C[T]] = {
Expand Down Expand Up @@ -381,10 +405,10 @@ object ParquetField {
pfValue: ParquetField[V]
): ParquetField[Map[K, V]] = {
new ParquetField[Map[K, V]] {
override def buildSchema(cm: CaseMapper): Type = {
val keySchema = Schema.rename(pfKey.schema(cm), KeyField)
override def buildSchema(cm: CaseMapper, conf: Configuration): Type = {
val keySchema = Schema.rename(pfKey.schema(cm, conf), KeyField)
require(keySchema.isRepetition(Repetition.REQUIRED), "Map key must be required")
val valueSchema = Schema.rename(pfValue.schema(cm), ValueField)
val valueSchema = Schema.rename(pfValue.schema(cm, conf), ValueField)
val keyValue = Types
.repeatedGroup()
.addField(keySchema)
Expand All @@ -397,26 +421,26 @@ object ParquetField {
.named("map")
}

override val hasAvroArray: Boolean = pfKey.hasAvroArray || pfValue.hasAvroArray

override protected def isEmpty(v: Map[K, V]): Boolean = v.isEmpty

override def fieldDocs(cm: CaseMapper): Map[String, String] = Map.empty

override val typeDoc: Option[String] = None

override def write(c: RecordConsumer, v: Map[K, V])(cm: CaseMapper): Unit = {
override def write(c: RecordConsumer, v: Map[K, V], conf: Configuration)(
cm: CaseMapper
): Unit = {
if (v.nonEmpty) {
c.startGroup()
c.startField(KeyValueGroup, 0)
v.foreach { case (k, v) =>
c.startGroup()
c.startField(KeyField, 0)
pfKey.writeGroup(c, k)(cm)
pfKey.writeGroup(c, k, conf)(cm)
c.endField(KeyField, 0)
if (pfValue.nonEmpty(v)) {
c.startField(ValueField, 1)
pfValue.writeGroup(c, v)(cm)
pfValue.writeGroup(c, v, conf)(cm)
c.endField(ValueField, 1)
}
c.endGroup()
Expand Down Expand Up @@ -469,8 +493,10 @@ object ParquetField {

class LogicalTypeWord[T](lta: => LogicalTypeAnnotation) extends Serializable {
def apply[U](f: T => U)(g: U => T)(implicit pf: Primitive[T]): Primitive[U] = new Primitive[U] {
override def buildSchema(cm: CaseMapper): Type = Schema.setLogicalType(pf.schema(cm), lta)
override def write(c: RecordConsumer, v: U)(cm: CaseMapper): Unit = pf.write(c, g(v))(cm)
override def buildSchema(cm: CaseMapper, conf: Configuration): Type =
Schema.setLogicalType(pf.schema(cm, conf), lta)
override def write(c: RecordConsumer, v: U, conf: Configuration)(cm: CaseMapper): Unit =
pf.write(c, g(v), conf)(cm)
override def newConverter(writerSchema: Type): TypeConverter[U] =
pf.newConverter(writerSchema).asInstanceOf[TypeConverter.Primitive[T]].map(f)

Expand Down Expand Up @@ -504,14 +530,16 @@ object ParquetField {
)

new Primitive[BigDecimal] {
override def buildSchema(cm: CaseMapper): Type =
override def buildSchema(cm: CaseMapper, conf: Configuration): Type =
Schema.primitive(
PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
LogicalTypeAnnotation.decimalType(scale, precision),
length
)

override def write(c: RecordConsumer, v: BigDecimal)(cm: CaseMapper): Unit =
override def write(c: RecordConsumer, v: BigDecimal, conf: Configuration)(
cm: CaseMapper
): Unit =
c.addBinary(Binary.fromConstantByteArray(Decimal.toFixed(v, precision, scale, length)))

override def newConverter(writerSchema: Type): TypeConverter[BigDecimal] =
Expand All @@ -535,10 +563,10 @@ object ParquetField {
logicalType[String](LogicalTypeAnnotation.enumType())(et.from)(et.to)

implicit val ptUuid: Primitive[UUID] = new Primitive[UUID] {
override def buildSchema(cm: CaseMapper): Type =
override def buildSchema(cm: CaseMapper, conf: Configuration): Type =
Schema.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, length = 16)

override def write(c: RecordConsumer, v: UUID)(cm: CaseMapper): Unit =
override def write(c: RecordConsumer, v: UUID, conf: Configuration)(cm: CaseMapper): Unit =
c.addBinary(
Binary.fromConstantByteArray(
ByteBuffer
Expand Down
Loading

0 comments on commit 81e3f07

Please sign in to comment.