Skip to content

Commit

Permalink
support scala3 in avroparquet (#158)
Browse files Browse the repository at this point in the history
* support scala3 in avroparquet

add some scala3 tests

Update AvroParquetSinkSpec.scala

Update avroparquet.md

Update AbstractAvroParquet.scala

Update AbstractAvroParquet.scala

* refactor test code

* Update AbstractAvroParquetBase.scala

* fix doc links

* use Common.isScala3

* Update Dependencies.scala

* Update build.sbt
  • Loading branch information
pjfanning authored Jun 12, 2023
1 parent 38fa9bb commit 6c8bf6c
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, derived from Akka.
*/

/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package docs.scaladsl

import com.sksamuel.avro4s.RecordFormat
import org.apache.pekko.testkit.TestKit
import org.scalatest.{ BeforeAndAfterAll, Suite }

import java.io.File
import scala.reflect.io.Directory

trait AbstractAvroParquet extends BeforeAndAfterAll with AbstractAvroParquetBase {
this: Suite with TestKit =>

val format: RecordFormat[Document] = RecordFormat[Document]

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
val directory = new Directory(new File(folder))
directory.deleteRecursively()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, derived from Akka.
*/

/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package docs.scaladsl

import com.sksamuel.avro4s._
import org.apache.pekko.testkit.TestKit
import org.scalatest.{ BeforeAndAfterAll, Suite }

import java.io.File

trait AbstractAvroParquet extends BeforeAndAfterAll with AbstractAvroParquetBase {
this: Suite with TestKit =>

implicit val toRecordDocument: ToRecord[Document] = ToRecord[Document](schema)
implicit val fromRecordDocument: FromRecord[Document] = FromRecord[Document](schema)
val format: RecordFormat[Document] = RecordFormat[Document](schema)

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
deleteRecursively(new File(folder))
}

private def deleteRecursively(f: File): Boolean = {
if (f.isDirectory) f.listFiles match {
case null =>
case xs => xs.foreach(deleteRecursively)
}
f.delete()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,18 @@

package docs.scaladsl

import java.io.File

import org.apache.pekko.testkit.TestKit
import com.sksamuel.avro4s.RecordFormat
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericRecord, GenericRecordBuilder }
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.{ AvroParquetReader, AvroParquetWriter, AvroReadSupport }
import org.apache.parquet.hadoop.{ ParquetReader, ParquetWriter }
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.hadoop.{ ParquetReader, ParquetWriter }
import org.scalacheck.Gen
import org.scalatest.{ BeforeAndAfterAll, Suite }

import scala.reflect.io.Directory
import scala.util.Random

trait AbstractAvroParquet extends BeforeAndAfterAll {
this: Suite with TestKit =>

trait AbstractAvroParquetBase {
case class Document(id: String, body: String)

val schema: Schema = new Schema.Parser().parse(
Expand All @@ -42,13 +34,13 @@ trait AbstractAvroParquet extends BeforeAndAfterAll {
Gen.oneOf(Seq(Document(id = Gen.alphaStr.sample.get, body = Gen.alphaLowerStr.sample.get)))
val genDocuments: Int => Gen[List[Document]] = n => Gen.listOfN(n, genDocument)

val format: RecordFormat[Document] = RecordFormat[Document]

val folder: String = "./" + Random.alphanumeric.take(8).mkString("")

val genFinalFile: Gen[String] = for {
fileName <- Gen.alphaLowerStr
} yield { folder + "/" + fileName + ".parquet" }
} yield {
folder + "/" + fileName + ".parquet"
}

val genFile: Gen[String] = Gen.oneOf(Seq(Gen.alphaLowerStr.sample.get + ".parquet"))

Expand Down Expand Up @@ -110,8 +102,8 @@ trait AbstractAvroParquet extends BeforeAndAfterAll {
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.hadoop.ParquetReader
import org.apache.parquet.hadoop.util.HadoopInputFile

val file: String = "./sample/path/test.parquet"
val writer: ParquetWriter[GenericRecord] =
Expand All @@ -124,10 +116,4 @@ trait AbstractAvroParquet extends BeforeAndAfterAll {
if (writer != null && reader != null) { // forces val usage
}
}

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
val directory = new Directory(new File(folder))
directory.deleteRecursively()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@

package docs.scaladsl

import com.sksamuel.avro4s.Record
import org.apache.avro.generic.GenericRecord
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.stream.connectors.avroparquet.scaladsl.AvroParquetFlow
import pekko.stream.scaladsl.{ Flow, Sink, Source }
import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import pekko.testkit.TestKit
import com.sksamuel.avro4s.Record
import org.apache.avro.generic.GenericRecord
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.apache.parquet.hadoop.ParquetWriter
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

class AvroParquetFlowSpec
extends TestKit(ActorSystem("FlowSpec"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@

package docs.scaladsl

import com.sksamuel.avro4s.Record
import org.apache.avro.generic.GenericRecord
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.ActorSystem
import pekko.stream.connectors.avroparquet.scaladsl.AvroParquetSink
import pekko.stream.scaladsl.Source
import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import pekko.testkit.TestKit
import com.sksamuel.avro4s.{ Record, RecordFormat }
import org.scalatest.concurrent.ScalaFutures
import org.apache.avro.generic.GenericRecord
import org.apache.parquet.hadoop.ParquetWriter
import pekko.{ Done, NotUsed }
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

Expand Down Expand Up @@ -63,7 +63,7 @@ class AvroParquetSinkSpec
val documents: List[Document] = genDocuments(n).sample.get
val writer: ParquetWriter[Record] = parquetWriter[Record](file, conf, schema)
// #init-sink
val records: List[Record] = documents.map(RecordFormat[Document].to(_))
val records: List[Record] = documents.map(format.to(_))
val source: Source[Record, NotUsed] = Source(records)
val result: Future[Done] = source
.runWith(AvroParquetSink(writer))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

package docs.scaladsl

import com.sksamuel.avro4s.Record
import org.apache.avro.generic.GenericRecord
import org.apache.parquet.hadoop.ParquetReader
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
Expand All @@ -21,11 +24,8 @@ import pekko.stream.scaladsl.{ Keep, Source }
import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import pekko.stream.testkit.scaladsl.TestSink
import pekko.testkit.TestKit
import com.sksamuel.avro4s.Record
import org.scalatest.concurrent.ScalaFutures
import org.apache.avro.generic.GenericRecord
import org.apache.parquet.hadoop.ParquetReader
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

Expand Down
11 changes: 1 addition & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,7 @@ lazy val ironmq = pekkoConnectorProject(

lazy val jms = pekkoConnectorProject("jms", "jms", Dependencies.Jms)

val scalaReleaseSeparateSource: Def.SettingsDefinition = Compile / unmanagedSourceDirectories ++= {
if (scalaVersion.value.startsWith("2")) {
Seq((LocalRootProject / baseDirectory).value / "src" / "main" / "scala-2")
} else {
Seq((LocalRootProject / baseDirectory).value / "src" / "main" / "scala-3")
}
}

lazy val jsonStreaming = pekkoConnectorProject("json-streaming", "json.streaming",
Dependencies.JsonStreaming ++ scalaReleaseSeparateSource)
lazy val jsonStreaming = pekkoConnectorProject("json-streaming", "json.streaming", Dependencies.JsonStreaming)

lazy val kinesis = pekkoConnectorProject("kinesis", "aws.kinesis", Dependencies.Kinesis)

Expand Down
4 changes: 2 additions & 2 deletions docs/src/main/paradox/avroparquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Sometimes it might be useful to use a Parquet file as stream Source. For this we
instance which will produce records as subtypes of `GenericRecord`, the Avro record's abstract representation.

Scala
: @@snip (/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquet.scala) { #prepare-source #init-reader }
: @@snip (/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala) { #prepare-source #init-reader }

Java
: @@snip (/avroparquet/src/test/java/docs/javadsl/Examples.java) { #init-reader }
Expand All @@ -49,7 +49,7 @@ On the other hand, you can use `AvroParquetWriter` as the Apache Pekko Streams S
In that case, its initialisation would require an instance of `org.apache.parquet.hadoop.ParquetWriter`. It will also expect any subtype of `GenericRecord` to be passed.

Scala
: @@snip (/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquet.scala) { #prepare-sink }
: @@snip (/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala) { #prepare-sink }

Java
: @@snip (/avroparquet/src/test/java/docs/javadsl/AvroParquetSinkTest.java) { #init-writer }
Expand Down
2 changes: 1 addition & 1 deletion project/Common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object Common extends AutoPlugin {
"com.google.api:com.google.cloud:com.google.iam:com.google.logging:" +
"com.google.longrunning:com.google.protobuf:com.google.rpc:com.google.type"

override lazy val projectSettings = Dependencies.Common ++ Seq(
override lazy val projectSettings = Dependencies.CommonSettings ++ Seq(
projectInfoVersion := (if (isSnapshot.value) "snapshot" else version.value),
crossVersion := CrossVersion.binary,
crossScalaVersions := Dependencies.ScalaVersions,
Expand Down
9 changes: 6 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object Dependencies {
val log4jOverSlf4jVersion = "1.7.36"
val jclOverSlf4jVersion = "1.7.36"

val Common = Seq(
val CommonSettings = Seq(
// These libraries are added to all modules via the `Common` AutoPlugin
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-stream" % PekkoVersion))
Expand Down Expand Up @@ -164,13 +164,16 @@ object Dependencies {
"com.google.jimfs" % "jimfs" % "1.2" % Test // ApacheV2
))

val avro4sVersion: Def.Initialize[String] = Def.setting {
if (Common.isScala3.value) "5.0.4" else "4.1.1"
}

val AvroParquet = Seq(
crossScalaVersions -= Scala3,
libraryDependencies ++= Seq(
"org.apache.parquet" % "parquet-avro" % "1.10.1", // Apache2
("org.apache.hadoop" % "hadoop-client" % "3.2.1" % Test).exclude("log4j", "log4j"), // Apache2
("org.apache.hadoop" % "hadoop-common" % "3.2.1" % Test).exclude("log4j", "log4j"), // Apache2
"com.sksamuel.avro4s" %% "avro4s-core" % "4.1.1" % Test,
"com.sksamuel.avro4s" %% "avro4s-core" % avro4sVersion.value % Test,
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % Test,
"org.specs2" %% "specs2-core" % "4.20.0" % Test, // MIT like: https://github.com/etorreborre/specs2/blob/master/LICENSE.txt
"org.slf4j" % "log4j-over-slf4j" % log4jOverSlf4jVersion % Test // MIT like: http://www.slf4j.org/license.html
Expand Down

0 comments on commit 6c8bf6c

Please sign in to comment.