diff --git a/avroparquet/src/test/scala-2/docs/scaladsl/AbstractAvroParquet.scala b/avroparquet/src/test/scala-2/docs/scaladsl/AbstractAvroParquet.scala new file mode 100644 index 000000000..3d2da02e0 --- /dev/null +++ b/avroparquet/src/test/scala-2/docs/scaladsl/AbstractAvroParquet.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, derived from Akka. + */ + +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package docs.scaladsl + +import com.sksamuel.avro4s.RecordFormat +import org.apache.pekko.testkit.TestKit +import org.scalatest.{ BeforeAndAfterAll, Suite } + +import java.io.File +import scala.reflect.io.Directory + +trait AbstractAvroParquet extends BeforeAndAfterAll with AbstractAvroParquetBase { + this: Suite with TestKit => + + val format: RecordFormat[Document] = RecordFormat[Document] + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + val directory = new Directory(new File(folder)) + directory.deleteRecursively() + } +} diff --git a/avroparquet/src/test/scala-3/docs/scaladsl/AbstractAvroParquet.scala b/avroparquet/src/test/scala-3/docs/scaladsl/AbstractAvroParquet.scala new file mode 100644 index 000000000..bd91822d6 --- /dev/null +++ b/avroparquet/src/test/scala-3/docs/scaladsl/AbstractAvroParquet.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, derived from Akka. + */ + +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package docs.scaladsl + +import com.sksamuel.avro4s._ +import org.apache.pekko.testkit.TestKit +import org.scalatest.{ BeforeAndAfterAll, Suite } + +import java.io.File + +trait AbstractAvroParquet extends BeforeAndAfterAll with AbstractAvroParquetBase { + this: Suite with TestKit => + + implicit val toRecordDocument: ToRecord[Document] = ToRecord[Document](schema) + implicit val fromRecordDocument: FromRecord[Document] = FromRecord[Document](schema) + val format: RecordFormat[Document] = RecordFormat[Document](schema) + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + deleteRecursively(new File(folder)) + } + + private def deleteRecursively(f: File): Boolean = { + if (f.isDirectory) f.listFiles match { + case null => + case xs => xs.foreach(deleteRecursively) + } + f.delete() + } +} diff --git a/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquet.scala b/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala similarity index 88% rename from avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquet.scala rename to avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala index 1369544d0..97a4052f8 100644 --- a/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquet.scala +++ b/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala @@ -13,26 +13,18 @@ package docs.scaladsl -import java.io.File - -import org.apache.pekko.testkit.TestKit -import com.sksamuel.avro4s.RecordFormat import org.apache.avro.Schema import org.apache.avro.generic.{ GenericRecord, GenericRecordBuilder } import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.avro.{ AvroParquetReader, AvroParquetWriter, AvroReadSupport } -import org.apache.parquet.hadoop.{ ParquetReader, ParquetWriter } import org.apache.parquet.hadoop.util.HadoopInputFile +import org.apache.parquet.hadoop.{ ParquetReader, ParquetWriter } import org.scalacheck.Gen -import org.scalatest.{ BeforeAndAfterAll, Suite } -import scala.reflect.io.Directory import scala.util.Random -trait AbstractAvroParquet extends BeforeAndAfterAll { - this: Suite with TestKit => - +trait AbstractAvroParquetBase { case class Document(id: String, body: String) val schema: Schema = new Schema.Parser().parse( @@ -42,13 +34,13 @@ trait AbstractAvroParquet extends BeforeAndAfterAll { Gen.oneOf(Seq(Document(id = Gen.alphaStr.sample.get, body = Gen.alphaLowerStr.sample.get))) val genDocuments: Int => Gen[List[Document]] = n => Gen.listOfN(n, genDocument) - val format: RecordFormat[Document] = RecordFormat[Document] - val folder: String = "./" + Random.alphanumeric.take(8).mkString("") val genFinalFile: Gen[String] = for { fileName <- Gen.alphaLowerStr - } yield { folder + "/" + fileName + ".parquet" } + } yield { + folder + "/" + fileName + ".parquet" + } val genFile: Gen[String] = Gen.oneOf(Seq(Gen.alphaLowerStr.sample.get + ".parquet")) @@ -110,8 +102,8 @@ trait AbstractAvroParquet extends BeforeAndAfterAll { import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader - import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.hadoop.ParquetReader + import org.apache.parquet.hadoop.util.HadoopInputFile val file: String = "./sample/path/test.parquet" val writer: ParquetWriter[GenericRecord] = @@ -124,10 +116,4 @@ trait AbstractAvroParquet extends BeforeAndAfterAll { if (writer != null && reader != null) { // forces val usage } } - - override def afterAll(): Unit = { - TestKit.shutdownActorSystem(system) - val directory = new Directory(new File(folder)) - directory.deleteRecursively() - } } diff --git a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetFlowSpec.scala b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetFlowSpec.scala index 626b15400..c52de6acc 100644 --- a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetFlowSpec.scala +++ b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetFlowSpec.scala @@ -13,6 +13,9 @@ package docs.scaladsl +import com.sksamuel.avro4s.Record +import org.apache.avro.generic.GenericRecord +import org.apache.parquet.hadoop.ParquetWriter import org.apache.pekko import pekko.NotUsed import pekko.actor.ActorSystem @@ -20,13 +23,10 @@ import pekko.stream.connectors.avroparquet.scaladsl.AvroParquetFlow import pekko.stream.scaladsl.{ Flow, Sink, Source } import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import pekko.testkit.TestKit -import com.sksamuel.avro4s.Record -import org.apache.avro.generic.GenericRecord -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpecLike -import org.apache.parquet.hadoop.ParquetWriter import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpecLike class AvroParquetFlowSpec extends TestKit(ActorSystem("FlowSpec")) diff --git a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSinkSpec.scala b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSinkSpec.scala index b92170a37..18b414e94 100644 --- a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSinkSpec.scala +++ b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSinkSpec.scala @@ -13,18 +13,18 @@ package docs.scaladsl +import com.sksamuel.avro4s.Record +import org.apache.avro.generic.GenericRecord +import org.apache.parquet.hadoop.ParquetWriter import org.apache.pekko -import pekko.{ Done, NotUsed } import pekko.actor.ActorSystem import pekko.stream.connectors.avroparquet.scaladsl.AvroParquetSink import pekko.stream.scaladsl.Source import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import pekko.testkit.TestKit -import com.sksamuel.avro4s.{ Record, RecordFormat } -import org.scalatest.concurrent.ScalaFutures -import org.apache.avro.generic.GenericRecord -import org.apache.parquet.hadoop.ParquetWriter +import pekko.{ Done, NotUsed } import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike @@ -63,7 +63,7 @@ class AvroParquetSinkSpec val documents: List[Document] = genDocuments(n).sample.get val writer: ParquetWriter[Record] = parquetWriter[Record](file, conf, schema) // #init-sink - val records: List[Record] = documents.map(RecordFormat[Document].to(_)) + val records: List[Record] = documents.map(format.to(_)) val source: Source[Record, NotUsed] = Source(records) val result: Future[Done] = source .runWith(AvroParquetSink(writer)) diff --git a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala index ea2300d6b..2fa88db1e 100644 --- a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala +++ b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala @@ -13,6 +13,9 @@ package docs.scaladsl +import com.sksamuel.avro4s.Record +import org.apache.avro.generic.GenericRecord +import org.apache.parquet.hadoop.ParquetReader import org.apache.pekko import pekko.NotUsed import pekko.actor.ActorSystem @@ -21,11 +24,8 @@ import pekko.stream.scaladsl.{ Keep, Source } import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import pekko.stream.testkit.scaladsl.TestSink import pekko.testkit.TestKit -import com.sksamuel.avro4s.Record -import org.scalatest.concurrent.ScalaFutures -import org.apache.avro.generic.GenericRecord -import org.apache.parquet.hadoop.ParquetReader import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike diff --git a/build.sbt b/build.sbt index deeaf3022..a6b815c93 100644 --- a/build.sbt +++ b/build.sbt @@ -277,16 +277,7 @@ lazy val ironmq = pekkoConnectorProject( lazy val jms = pekkoConnectorProject("jms", "jms", Dependencies.Jms) -val scalaReleaseSeparateSource: Def.SettingsDefinition = Compile / unmanagedSourceDirectories ++= { - if (scalaVersion.value.startsWith("2")) { - Seq((LocalRootProject / baseDirectory).value / "src" / "main" / "scala-2") - } else { - Seq((LocalRootProject / baseDirectory).value / "src" / "main" / "scala-3") - } -} - -lazy val jsonStreaming = pekkoConnectorProject("json-streaming", "json.streaming", - Dependencies.JsonStreaming ++ scalaReleaseSeparateSource) +lazy val jsonStreaming = pekkoConnectorProject("json-streaming", "json.streaming", Dependencies.JsonStreaming) lazy val kinesis = pekkoConnectorProject("kinesis", "aws.kinesis", Dependencies.Kinesis) diff --git a/docs/src/main/paradox/avroparquet.md b/docs/src/main/paradox/avroparquet.md index 270aad232..752eca4ee 100644 --- a/docs/src/main/paradox/avroparquet.md +++ b/docs/src/main/paradox/avroparquet.md @@ -29,7 +29,7 @@ Sometimes it might be useful to use a Parquet file as stream Source. For this we instance which will produce records as subtypes of `GenericRecord`, the Avro record's abstract representation. Scala -: @@snip (/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquet.scala) { #prepare-source #init-reader } +: @@snip (/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala) { #prepare-source #init-reader } Java : @@snip (/avroparquet/src/test/java/docs/javadsl/Examples.java) { #init-reader } @@ -49,7 +49,7 @@ On the other hand, you can use `AvroParquetWriter` as the Apache Pekko Streams S In that case, its initialisation would require an instance of `org.apache.parquet.hadoop.ParquetWriter`. It will also expect any subtype of `GenericRecord` to be passed. Scala -: @@snip (/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquet.scala) { #prepare-sink } +: @@snip (/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala) { #prepare-sink } Java : @@snip (/avroparquet/src/test/java/docs/javadsl/AvroParquetSinkTest.java) { #init-writer } diff --git a/project/Common.scala b/project/Common.scala index 2f020dff0..adaa1f49b 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -51,7 +51,7 @@ object Common extends AutoPlugin { "com.google.api:com.google.cloud:com.google.iam:com.google.logging:" + "com.google.longrunning:com.google.protobuf:com.google.rpc:com.google.type" - override lazy val projectSettings = Dependencies.Common ++ Seq( + override lazy val projectSettings = Dependencies.CommonSettings ++ Seq( projectInfoVersion := (if (isSnapshot.value) "snapshot" else version.value), crossVersion := CrossVersion.binary, crossScalaVersions := Dependencies.ScalaVersions, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 88c3dab13..0708a1c6f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -56,7 +56,7 @@ object Dependencies { val log4jOverSlf4jVersion = "1.7.36" val jclOverSlf4jVersion = "1.7.36" - val Common = Seq( + val CommonSettings = Seq( // These libraries are added to all modules via the `Common` AutoPlugin libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-stream" % PekkoVersion)) @@ -154,13 +154,16 @@ object Dependencies { libraryDependencies ++= Seq( "com.google.jimfs" % "jimfs" % "1.2" % Test)) + val avro4sVersion: Def.Initialize[String] = Def.setting { + if (Common.isScala3.value) "5.0.4" else "4.1.1" + } + val AvroParquet = Seq( - crossScalaVersions -= Scala3, libraryDependencies ++= Seq( - "org.apache.parquet" % "parquet-avro" % "1.10.1", - ("org.apache.hadoop" % "hadoop-client" % "3.2.1" % Test).exclude("log4j", "log4j"), - ("org.apache.hadoop" % "hadoop-common" % "3.2.1" % Test).exclude("log4j", "log4j"), - "com.sksamuel.avro4s" %% "avro4s-core" % "4.1.1" % Test, + "org.apache.parquet" % "parquet-avro" % "1.10.1", // Apache2 + ("org.apache.hadoop" % "hadoop-client" % "3.2.1" % Test).exclude("log4j", "log4j"), // Apache2 + ("org.apache.hadoop" % "hadoop-common" % "3.2.1" % Test).exclude("log4j", "log4j"), // Apache2 + "com.sksamuel.avro4s" %% "avro4s-core" % avro4sVersion.value % Test, "org.scalacheck" %% "scalacheck" % scalaCheckVersion % Test, "org.specs2" %% "specs2-core" % "4.20.0" % Test, // MIT like: https://github.com/etorreborre/specs2/blob/master/LICENSE.txt "org.slf4j" % "log4j-over-slf4j" % log4jOverSlf4jVersion % Test // MIT like: http://www.slf4j.org/license.html