Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(fix #766) Deprecate AvroCompat, replace automatic schema detection on read + Configurable write #996

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ lazy val tools = project
"com.google.apis" % "google-api-services-bigquery" % bigqueryVersion,
"org.apache.avro" % "avro" % avroVersion % Provided,
"org.apache.parquet" % "parquet-hadoop" % parquetVersion,
"org.apache.hadoop" % "hadoop-common" % hadoopVersion,
Copy link
Contributor Author

@clairemcginty clairemcginty Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the fence about relying so heavily on the hadoop Configuration class, since it pulls in hadoop-common artifact and links us more tightly with Hadoop. Parquet is trying to move away from Configuration and onto their own ParquetConfiguration class, which we could use instead. However, it might be confusing for Scio users since Scio is heavily dependent on Configuration and we don't have immediate plans to offboard from it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I might pull this out into a separate PR. will update shortly

"org.typelevel" %% "paiges-core" % paigesVersion
)
)
Expand All @@ -707,6 +708,7 @@ lazy val jmh: Project = project
cats % Test,
datastore % Test,
guava % Test,
parquet % "test->test",
protobuf % "test->test",
scalacheck % Test,
tensorflow % Test,
Expand All @@ -726,7 +728,12 @@ lazy val jmh: Project = project
"com.google.apis" % "google-api-services-bigquery" % bigqueryVersion % Test,
"com.google.cloud.datastore" % "datastore-v1-proto-client" % datastoreVersion % Test,
"org.apache.avro" % "avro" % avroVersion % Test,
"org.tensorflow" % "tensorflow-core-api" % tensorflowVersion % Test
"org.tensorflow" % "tensorflow-core-api" % tensorflowVersion % Test,
"org.apache.parquet" % "parquet-avro" % parquetVersion % Test,
"org.apache.parquet" % "parquet-column" % parquetVersion % Test,
"org.apache.parquet" % "parquet-hadoop" % parquetVersion % Test,
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % Test,
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion % Test
)
)

Expand Down
80 changes: 78 additions & 2 deletions jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@

package magnolify.jmh

import java.util.concurrent.TimeUnit
import magnolify.parquet.{MagnolifyParquetProperties, ParquetType, TestInputFile, TestOutputFile}

import java.util.concurrent.TimeUnit
import magnolify.scalacheck.auto._
import magnolify.test.Simple._
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter}
import org.scalacheck._
import org.openjdk.jmh.annotations._

import scala.jdk.CollectionConverters._

object MagnolifyBench {
val seed: rng.Seed = rng.Seed(0)
val prms: Gen.Parameters = Gen.Parameters.default
Expand Down Expand Up @@ -87,6 +92,77 @@ class AvroBench {
@Benchmark def avroSchema: Schema = AvroType[Nested].schema
}

@State(Scope.Benchmark)
class ParquetReadState(pt: ParquetType[Nested]) {
var out: TestOutputFile = null
var reader: ParquetReader[Nested] = null

@Setup(Level.Invocation)
def setup(): Unit = {
out = new TestOutputFile
val writer = pt.writeBuilder(out).build()
writer.write(MagnolifyBench.nested)
writer.close()

val in = new TestInputFile(out.getBytes)
reader = pt.readBuilder(in).build()
}

@TearDown(Level.Invocation)
def tearDown(): Unit = {
reader.close()
}
}

@State(Scope.Benchmark)
class ParquetWriteState(pt: ParquetType[Nested]) {
var writer: ParquetWriter[Nested] = null

@Setup(Level.Invocation)
def setup(): Unit = {
val out = new TestOutputFile
writer = pt.writeBuilder(out).build()
}

@TearDown(Level.Invocation)
def tearDown(): Unit = {
writer.close()
}
}

object ParquetStates {
def confWithGroupedArraysProp(propValue: Boolean): Configuration = {
val conf = new Configuration()
conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, propValue)
conf
}
class DefaultParquetReadState extends ParquetReadState(ParquetType[Nested](confWithGroupedArraysProp(false)))
class DefaultParquetWriteState extends ParquetWriteState(ParquetType[Nested](confWithGroupedArraysProp(false)))

class ParquetAvroCompatReadState extends ParquetReadState(ParquetType[Nested](confWithGroupedArraysProp(true)))
class ParquetAvroCompatWriteState extends ParquetWriteState(ParquetType[Nested](confWithGroupedArraysProp(true)))
}

@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Thread)
class ParquetBench {
import MagnolifyBench._

@Benchmark def parquetWrite(state: ParquetStates.DefaultParquetWriteState): Unit = state.writer.write(nested)
@Benchmark def parquetRead(state: ParquetStates.DefaultParquetReadState): Nested = state.reader.read()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to capture a "true" read benchmark for Parquet since there's so much happening under the hood here (reading and caching the row group, for example). But at least this can be used to track positive and negative trends

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, this was bothering me, so I re-implemented this benchmark so that instead of reading/writing entire file streams, it's directly writing/reading Pages (smallest unit of IO granularity in Parquet). This skips a lot of the overhead of the file/rowgroup IO, so that we're able to specifically benchmark ParquetType's functionality: converting between parquet Groups and Scala case classes.

}

@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Thread)
class ParquetAvroCompatBench {
import MagnolifyBench._

@Benchmark def parquetWrite(state: ParquetStates.ParquetAvroCompatWriteState): Unit = state.writer.write(nested)
@Benchmark def parquetRead(state: ParquetStates.ParquetAvroCompatReadState): Nested = state.reader.read()
}

@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Thread)
Expand Down Expand Up @@ -157,7 +233,7 @@ class ExampleBench {
private val exampleNested = implicitly[Arbitrary[ExampleNested]].arbitrary(prms, seed).get
private val example = exampleType.to(exampleNested).build()
@Benchmark def exampleTo: Example.Builder = exampleType.to(exampleNested)
@Benchmark def exampleFrom: ExampleNested = exampleType.from(example)
@Benchmark def exampleFrom: ExampleNested = exampleType.from(example.getFeatures.getFeatureMap.asScala.toMap)
}

// Collections are not supported
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package magnolify.parquet

import org.apache.hadoop.conf.Configuration

/**
* Properties for reading and writing Magnolify ParquetType classes, configurable via a Hadoop
* [[Configuration]] instance.
*/
object MagnolifyParquetProperties {
val WriteGroupedArrays: String = "magnolify.parquet.write-grouped-arrays"
val WriteGroupedArraysDefault: Boolean = false

val WriteAvroSchemaToMetadata: String = "magnolify.parquet.write-avro-schema"
val WriteAvroSchemaToMetadataDefault: Boolean = true

val ReadTypeKey = "parquet.type.read.type"
val WriteTypeKey = "parquet.type.write.type"

// Hash any Configuration values that might affect schema creation to use as part of Schema cache key
private[parquet] def hashValues(conf: Configuration): Int =
Option(conf.get(WriteGroupedArrays))
.map(_.toBoolean)
.getOrElse(WriteGroupedArraysDefault)
.hashCode()

Check warning on line 40 in parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala

View check run for this annotation

Codecov / codecov/patch

parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala#L40

Added line #L40 was not covered by tests
}
Loading
Loading