Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scala 3 support for json streaming #146

Merged
merged 4 commits into from
Jun 9, 2023

Conversation

pjfanning
Copy link
Contributor

@pjfanning pjfanning commented Jun 8, 2023

Part of #126

A strange Scala 3 issue means that immutable.Queue.enqueue(ByteString) won't compile. I tried to produce a minimal case in scastie to report a Scala compiler bug but the minimal case works.

For now, I think using a mutable.Queue is ok and I can log an issue to keep checking when we upgrade Scala 3 to see if the issue is fixed.

@pjfanning pjfanning changed the base branch from main to scala3 June 8, 2023 22:46
Copy link
Contributor

@nvollmar nvollmar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@He-Pin
Copy link
Member

He-Pin commented Jun 9, 2023

@pjfanning What's the error looks like.

@mdedetrich
Copy link
Contributor

Yeah I am a bit wary of changing the underlying collection type from immutable to mutable. @pjfanning Can you post the error that you got?

@pjfanning
Copy link
Contributor Author

[error] -- [E007] Type Mismatch Error: /Users/pj.fanning/code/incubator-pekko-connectors/json-streaming/src/main/scala/org/apache/pekko/stream/connectors/json/impl/JsonStreamReader.scala:52:48 
[error] 52 |              buffer = buffer.enqueue(ByteString(value.toString))
[error]    |                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^
[error]    |                    Found:    org.apache.pekko.util.ByteString
[error]    |                    Required: Iterable[org.apache.pekko.util.ByteString]
[error]    |
[error]    | longer explanation available when compiling with `-explain`
[warn] three warnings found
[error] one error found
[error] (json-streaming / Compile / compileIncremental) Compilation failed

There is a deprecated enqueue(Iterable[T]) function and for some reason Scala 3.3.0 compiler ignores the fact enqueue(T) is available.

@mdedetrich
Copy link
Contributor

There is a deprecated enqueue(Iterable[T]) function and for some reason Scala 3.3.0 compiler ignores the fact enqueue(T) is available.

What happens if you put ByteString(value.toString) into a List with a single value, i.e.

buffer.enqueue(List(ByteString(value.toString)))

?

@pjfanning
Copy link
Contributor Author

There is a deprecated enqueue(Iterable[T]) function and for some reason Scala 3.3.0 compiler ignores the fact enqueue(T) is available.

What happens if you put ByteString(value.toString) into a List with a single value, i.e.

buffer.enqueue(List(ByteString(value.toString)))

?

That works but isn't it wasteful to create a Seq to get around the compiler issue?

Copy link
Contributor

@mdedetrich mdedetrich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I just checked the code and using a mutable queue to get around this problem is fine since its internal however there is one thing I noticed, can we change private var buffer = Queue.empty[ByteString] to a val (since there is no reason for it to be a var, we are not overriding the reference since we are using a mutable collection).

@pjfanning
Copy link
Contributor Author

So I just checked the code and using a mutable queue to get around this problem is fine since its internal however there is one thing I noticed, can we change private var buffer = Queue.empty[ByteString] to a val (since there is no reason for it to be a var, we are not overriding the reference since we are using a mutable collection).

There is a 2nd place where the queue is recreated. I can see if I can clear the original queue instead of resetting the var.

@mdedetrich
Copy link
Contributor

There is a 2nd place where the queue is recreated. I can see if I can clear the original queue instead of resetting the var.

You should be able to just clear the queue?

@pjfanning
Copy link
Contributor Author

There is a 2nd place where the queue is recreated. I can see if I can clear the original queue instead of resetting the var.

You should be able to just clear the queue?

  • clearing a queue affects the iterator that was previously created for the queue (before the clear) - causes an IndexOutOfBoundsException
  • I'm trying a different solution at the moment

@pjfanning
Copy link
Contributor Author

@mdedetrich could you review the latest change when you get a chance?

@@ -69,8 +69,8 @@ private[pekko] final class JsonStreamReader(path: JsonPath) extends GraphStage[F
}

if (buffer.nonEmpty) {
emitMultiple(out, buffer)
buffer = Queue.empty[ByteString]
emitMultiple(out, buffer.toSeq.iterator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I just noticed this now and I am thinking that the List(ByteString(value.toString)) variant is better because while there may be a trivial box for a single List (which might get inlined anyways), the buffer.toSeq.iterator is basically going to copy the entire collection which is much worse especially considering that the size of this collection can be quite large

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjfanning You can revert this line now since buffer is immutable

@mdedetrich
Copy link
Contributor

mdedetrich commented Jun 9, 2023

@pjfanning Ah I noticed the core problem, just use enqueueAll instead of enqueue (otherwise the solution is exactly the same)! ByteString is actually an iterator/collection and so enqueueAll is what we should be using, the definition of both of these functions should make it clear

  @deprecated("Use `enqueueAll` instead of `enqueue` to enqueue a collection of elements", "2.13.0")
  @`inline` final def enqueue[B >: A](iter: scala.collection.Iterable[B]) = enqueueAll(iter)

  /** Creates a new queue with all elements provided by an `Iterable` object
    *  added at the end of the old queue.
    *
    *  The elements are appended in the order they are given out by the
    *  iterator.
    *
    *  @param  iter        an iterable object
    */
  def enqueueAll[B >: A](iter: scala.collection.Iterable[B]): Queue[B] = appendedAll(iter)

In other words the current version is calling def enqueue[B >: A](iter: scala.collection.Iterable[B]) = enqueueAll(iter) which is deprecated in 2.13 and ByteString is a subtype of scala.collection.Iterable.

@pjfanning
Copy link
Contributor Author

@pjfanning Ah I noticed the core problem, just use enqueueAll instead of enqueue (otherwise the solution is exactly the same)! ByteString is actually an iterator/collection and so enqueueAll is what we should be using, the definition of both of these functions should make it clear

  @deprecated("Use `enqueueAll` instead of `enqueue` to enqueue a collection of elements", "2.13.0")
  @`inline` final def enqueue[B >: A](iter: scala.collection.Iterable[B]) = enqueueAll(iter)

  /** Creates a new queue with all elements provided by an `Iterable` object
    *  added at the end of the old queue.
    *
    *  The elements are appended in the order they are given out by the
    *  iterator.
    *
    *  @param  iter        an iterable object
    */
  def enqueueAll[B >: A](iter: scala.collection.Iterable[B]): Queue[B] = appendedAll(iter)

In other words the current version is calling def enqueue[B >: A](iter: scala.collection.Iterable[B]) = enqueueAll(iter) which is deprecated in 2.13 and ByteString is a subtype of scala.collection.Iterable.

That doesn't compile in Scala 2.

[error] /Users/pj.fanning/code/incubator-pekko-connectors/json-streaming/src/main/scala/org/apache/pekko/stream/connectors/json/impl/JsonStreamReader.scala:52:52: type mismatch;
[error]  found   : org.apache.pekko.util.ByteString
[error]  required: Iterable[org.apache.pekko.util.ByteString]
[error]               buffer = buffer.enqueueAll(ByteString(value.toString))
[error]                                                    ^
[error] one error found

buffer is Queue[ByteString] so only ByteStrings can be queued. ByteString might be an Iterable[Byte] but that is not compatible with Queue[ByteString].

@pjfanning pjfanning force-pushed the scala3-json-streaming branch from 3cc59be to e77ba9c Compare June 9, 2023 09:50
@pjfanning
Copy link
Contributor Author

I raised scala/scala3#17946

@pjfanning pjfanning force-pushed the scala3-json-streaming branch from ff88a98 to e44d75d Compare June 9, 2023 10:20
Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala
@pjfanning pjfanning force-pushed the scala3-json-streaming branch from 30d7c43 to c2d6a41 Compare June 9, 2023 10:26
Copy link
Contributor

@mdedetrich mdedetrich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect, approving assuming CI passes.

@mdedetrich
Copy link
Contributor

@pjfanning Just noticed that Iterable.single doesn't exist in Scala 2.12. I will fix this in Pekko.

@pjfanning
Copy link
Contributor Author

@pjfanning Just noticed that Iterable.single doesn't exist in Scala 2.12. I will fix this in Pekko.

I'm trying to add a QueueHelper with different Scala 2 and Scala 3 implementations in the json-streaming module.

@pjfanning
Copy link
Contributor Author

@pjfanning Just noticed that Iterable.single doesn't exist in Scala 2.12. I will fix this in Pekko.

I'm trying to add a QueueHelper with different Scala 2 and Scala 3 implementations in the json-streaming module.

@mdedetrich I added the QueueHelper

@mdedetrich
Copy link
Contributor

I'm trying to add a QueueHelper with different Scala 2 and Scala 3 implementations in the json-streaming module.

I have already added the Iterable.single method to Scala 2.12 under pekko's ccompat module. The PR is here apache/pekko#378

@mdedetrich
Copy link
Contributor

Okay so this solution is nicer in the sense that for Scala 2 its going to be faster/unchanged and only Scala 3 has the workaround. Still approved from me end.

@pjfanning
Copy link
Contributor Author

Okay so this solution is nicer in the sense that for Scala 2 its going to be faster/unchanged and only Scala 3 has the workaround. Still approved from me end.

My solution has the benefit that in Scala 2, no wrapping Iterable is used

@pjfanning pjfanning merged commit bb2dbdb into apache:scala3 Jun 9, 2023
@pjfanning pjfanning deleted the scala3-json-streaming branch June 9, 2023 11:48
pjfanning added a commit that referenced this pull request Jun 9, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
pjfanning added a commit that referenced this pull request Jun 10, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
pjfanning added a commit that referenced this pull request Jun 10, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
pjfanning added a commit that referenced this pull request Jun 26, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
pjfanning added a commit that referenced this pull request Jun 26, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
pjfanning added a commit that referenced this pull request Jun 27, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
mdedetrich pushed a commit to mdedetrich/pekko-connectors that referenced this pull request Jul 3, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
pjfanning added a commit that referenced this pull request Jul 14, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
pjfanning added a commit that referenced this pull request Aug 3, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
pjfanning added a commit that referenced this pull request Aug 6, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
pjfanning added a commit that referenced this pull request Aug 6, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
pjfanning added a commit that referenced this pull request Aug 11, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
mdedetrich pushed a commit to mdedetrich/pekko-connectors that referenced this pull request Aug 15, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
pjfanning added a commit that referenced this pull request Aug 17, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
pjfanning added a commit that referenced this pull request Aug 19, 2023
* scala 3 support for json streaming

Update JsonStreamReader.scala

make buffer a val

Update JsonStreamReader.scala

Update JsonStreamReader.scala

use Iterable.single

Delete MyByteString.scala

compile issue in scala 2.12

Update JsonStreamReader.scala

* Update Dependencies.scala

* use separate scala2/3 code

* Update QueueHelper.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants