Skip to content

Commit

Permalink
scala 3 support for json streaming (#146)
Browse files Browse the repository at this point in the history
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
  • Loading branch information
pjfanning committed Jun 10, 2023
1 parent 7cae1f6 commit e18fd86
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 4 deletions.
11 changes: 10 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,16 @@ lazy val ironmq = pekkoConnectorProject(

lazy val jms = pekkoConnectorProject("jms", "jms", Dependencies.Jms)

lazy val jsonStreaming = pekkoConnectorProject("json-streaming", "json.streaming", Dependencies.JsonStreaming)
val scalaReleaseSeparateSource: Def.SettingsDefinition = Compile / unmanagedSourceDirectories ++= {
if (scalaVersion.value.startsWith("2")) {
Seq((LocalRootProject / baseDirectory).value / "src" / "main" / "scala-2")
} else {
Seq((LocalRootProject / baseDirectory).value / "src" / "main" / "scala-3")
}
}

lazy val jsonStreaming = pekkoConnectorProject("json-streaming", "json.streaming",
Dependencies.JsonStreaming ++ scalaReleaseSeparateSource)

lazy val kinesis = pekkoConnectorProject("kinesis", "aws.kinesis", Dependencies.Kinesis)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, derived from Akka.
*/

package org.apache.pekko.stream.connectors.json.impl

import org.apache.pekko.util.ByteString

import scala.collection.immutable.Queue

private[impl] object QueueHelper {
@inline final def enqueue(queue: Queue[ByteString], byteString: ByteString): Queue[ByteString] =
queue.enqueue(byteString)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, derived from Akka.
*/

package org.apache.pekko.stream.connectors.json.impl

import org.apache.pekko.util.ByteString

import scala.collection.immutable.Queue

private[impl] object QueueHelper {
inline final def enqueue(queue: Queue[ByteString], byteString: ByteString): Queue[ByteString] = {
// see https://github.com/lampepfl/dotty/issues/17946
queue.enqueueAll(Iterable.single(byteString))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ private[pekko] final class JsonStreamReader(path: JsonPath) extends GraphStage[F
private val config = surfer.configBuilder
.bind(path,
new JsonPathListener {
override def onValue(value: Any, context: ParsingContext): Unit =
buffer = buffer.enqueue(ByteString(value.toString))
override def onValue(value: Any, context: ParsingContext): Unit = {
// see https://github.com/lampepfl/dotty/issues/17946
buffer = QueueHelper.enqueue(buffer, ByteString(value.toString))
}
})
.build
private val parser = surfer.createNonBlockingParser(config)
Expand Down
1 change: 0 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ object Dependencies {
"https://repository.jboss.org/nexus/content/groups/public")) +: externalResolvers.value)

val JsonStreaming = Seq(
crossScalaVersions -= Scala3,
libraryDependencies ++= Seq(
"com.github.jsurfer" % "jsurfer-jackson" % "1.6.0" // MIT
) ++ JacksonDatabindDependencies)
Expand Down

0 comments on commit e18fd86

Please sign in to comment.