-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
scala 3 support for json streaming #146
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
@pjfanning What's the error looks like. |
Yeah I am a bit wary of changing the underlying collection type from |
There is a deprecated enqueue(Iterable[T]) function and for some reason Scala 3.3.0 compiler ignores the fact enqueue(T) is available. |
What happens if you put
? |
That works but isn't it wasteful to create a Seq to get around the compiler issue? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I just checked the code and using a mutable queue to get around this problem is fine since its internal however there is one thing I noticed, can we change private var buffer = Queue.empty[ByteString]
to a val
(since there is no reason for it to be a var, we are not overriding the reference since we are using a mutable collection).
There is a 2nd place where the queue is recreated. I can see if I can clear the original queue instead of resetting the var. |
You should be able to just clear the queue? |
|
@mdedetrich could you review the latest change when you get a chance? |
@@ -69,8 +69,8 @@ private[pekko] final class JsonStreamReader(path: JsonPath) extends GraphStage[F | |||
} | |||
|
|||
if (buffer.nonEmpty) { | |||
emitMultiple(out, buffer) | |||
buffer = Queue.empty[ByteString] | |||
emitMultiple(out, buffer.toSeq.iterator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I just noticed this now and I am thinking that the List(ByteString(value.toString))
variant is better because while there may be a trivial box for a single List
(which might get inlined anyways), the buffer.toSeq.iterator
is basically going to copy the entire collection which is much worse especially considering that the size of this collection can be quite large
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pjfanning You can revert this line now since buffer is immutable
@pjfanning Ah I noticed the core problem, just use @deprecated("Use `enqueueAll` instead of `enqueue` to enqueue a collection of elements", "2.13.0")
@`inline` final def enqueue[B >: A](iter: scala.collection.Iterable[B]) = enqueueAll(iter)
/** Creates a new queue with all elements provided by an `Iterable` object
* added at the end of the old queue.
*
* The elements are appended in the order they are given out by the
* iterator.
*
* @param iter an iterable object
*/
def enqueueAll[B >: A](iter: scala.collection.Iterable[B]): Queue[B] = appendedAll(iter) In other words the current version is calling |
That doesn't compile in Scala 2.
buffer is Queue[ByteString] so only ByteStrings can be queued. ByteString might be an Iterable[Byte] but that is not compatible with Queue[ByteString]. |
3cc59be
to
e77ba9c
Compare
I raised scala/scala3#17946 |
...streaming/src/main/scala/org/apache/pekko/stream/connectors/json/impl/JsonStreamReader.scala
Outdated
Show resolved
Hide resolved
json-streaming/src/main/scala/org/apache/pekko/stream/connectors/json/impl/MyByteString.scala
Outdated
Show resolved
Hide resolved
ff88a98
to
e44d75d
Compare
Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala
30d7c43
to
c2d6a41
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect, approving assuming CI passes.
@pjfanning Just noticed that |
I'm trying to add a QueueHelper with different Scala 2 and Scala 3 implementations in the json-streaming module. |
@mdedetrich I added the QueueHelper |
I have already added the |
Okay so this solution is nicer in the sense that for Scala 2 its going to be faster/unchanged and only Scala 3 has the workaround. Still approved from me end. |
My solution has the benefit that in Scala 2, no wrapping Iterable is used |
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
* scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala
Part of #126
A strange Scala 3 issue means that immutable.Queue.enqueue(ByteString) won't compile. I tried to produce a minimal case in scastie to report a Scala compiler bug but the minimal case works.
For now, I think using a mutable.Queue is ok and I can log an issue to keep checking when we upgrade Scala 3 to see if the issue is fixed.