From c59ed56cb418994463dcca82c85cee030db36634 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 12:10:02 -0500 Subject: [PATCH 01/10] Generalize the runSubscription mechanism of StreamPublisher --- .../src/main/scala/fs2/interop/flow/StreamPublisher.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala index 3cc596f2a2..2cd8b421f7 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala @@ -41,7 +41,7 @@ import scala.util.control.NoStackTrace */ private[flow] final class StreamPublisher[F[_], A] private ( stream: Stream[F, A], - startDispatcher: Dispatcher[F] + runSubscription: F[Unit] => Unit )(implicit F: Async[F]) extends Publisher[A] { override def subscribe(subscriber: Subscriber[_ >: A]): Unit = { @@ -50,7 +50,7 @@ private[flow] final class StreamPublisher[F[_], A] private ( "The subscriber provided to subscribe must not be null" ) try - startDispatcher.unsafeRunAndForget( + runSubscription( StreamSubscription.subscribe(stream, subscriber) ) catch { @@ -69,7 +69,7 @@ private[flow] object StreamPublisher { stream: Stream[F, A] )(implicit F: Async[F]): Resource[F, StreamPublisher[F, A]] = Dispatcher.parallel[F](await = false).map { startDispatcher => - new StreamPublisher(stream, startDispatcher) + new StreamPublisher(stream, startDispatcher.unsafeRunAndForget) } private object CanceledStreamPublisherException From b504e46735b0a35443fb3b73c17aed720a173089 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 12:17:34 -0500 Subject: [PATCH 02/10] Add StreamPublisher.unsafe --- .../scala/fs2/interop/flow/StreamPublisher.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala index 2cd8b421f7..2208613bc2 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala @@ -23,8 +23,10 @@ package fs2 package interop package flow +import cats.effect.IO import cats.effect.kernel.{Async, Resource} import cats.effect.std.Dispatcher +import cats.effect.unsafe.IORuntime import java.util.Objects.requireNonNull import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription} @@ -69,9 +71,20 @@ private[flow] object StreamPublisher { stream: Stream[F, A] )(implicit F: Async[F]): Resource[F, StreamPublisher[F, A]] = Dispatcher.parallel[F](await = false).map { startDispatcher => - new StreamPublisher(stream, startDispatcher.unsafeRunAndForget) + new StreamPublisher( + stream, + runSubscription = startDispatcher.unsafeRunAndForget + ) } + def unsafe[A]( + stream: Stream[IO, A] + )(implicit runtime: IORuntime): StreamPublisher[IO, A] = + new StreamPublisher( + stream, + runSubscription = _.unsafeRunAndForget() + ) + private object CanceledStreamPublisherException extends IllegalStateException( "This StreamPublisher is not longer accepting subscribers" From 1804b436a0a74df7dec1e0a72ea4fec8a1e8a3dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 12:30:41 -0500 Subject: [PATCH 03/10] Add flow.unsafeToPublisher --- .../main/scala/fs2/interop/flow/package.scala | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/fs2/interop/flow/package.scala b/core/shared/src/main/scala/fs2/interop/flow/package.scala index 50a9d64f9c..64020b1c7f 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/package.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/package.scala @@ -22,7 +22,9 @@ package fs2 package interop +import cats.effect.IO import cats.effect.kernel.{Async, Resource} +import cats.effect.unsafe.IORuntime import java.util.concurrent.Flow.{Publisher, Subscriber, defaultBufferSize} @@ -137,9 +139,10 @@ package object flow { * Closing the [[Resource]] means gracefully shutting down all active subscriptions. * Thus, no more elements will be published. * - * @note This Publisher can be reused for multiple Subscribers, - * each subscription will re-run the [[Stream]] from the beginning. + * @note This [[Publisher]] can be reused for multiple [[Subscribers]], + * each [[Subscription]] will re-run the [[Stream]] from the beginning. * + * @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]]. * @see [[subscribeStream]] for a simpler version that only requires a [[Subscriber]]. * * @param stream The [[Stream]] to transform. @@ -151,6 +154,24 @@ package object flow { ): Resource[F, Publisher[A]] = StreamPublisher(stream) + /** Creates a [[Publisher]] from a [[Stream]]. + * + * The stream is only ran when elements are requested. + * + * @note This [[Publisher]] can be reused for multiple [[Subscribers]], + * each [[Subscription]] will re-run the [[Stream]] from the beginning. + * + * @see [[toPublisher]] for a safe version that returns a [[Resource]]. + * + * @param stream The [[Stream]] to transform. + */ + def unsafeToPublisher[A]( + stream: Stream[IO, A] + )(implicit + runtime: IORuntime + ): Publisher[A] = + StreamPublisher.unsafe(stream) + /** Allows subscribing a [[Subscriber]] to a [[Stream]]. * * The returned program will run until From b0e5710504ce7561c0994c863e6da5918cda043c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 12:31:21 -0500 Subject: [PATCH 04/10] Add stream.unsafeToPublisher() to flow.syntax --- .../main/scala/fs2/interop/flow/syntax.scala | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/fs2/interop/flow/syntax.scala b/core/shared/src/main/scala/fs2/interop/flow/syntax.scala index 99fa0e23c7..eaa5796f70 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/syntax.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/syntax.scala @@ -23,7 +23,9 @@ package fs2 package interop package flow +import cats.effect.IO import cats.effect.kernel.{Async, Resource} +import cats.effect.unsafe.IORuntime import java.util.concurrent.Flow.{Publisher, Subscriber} @@ -66,9 +68,10 @@ object syntax { * Closing the [[Resource]] means gracefully shutting down all active subscriptions. * Thus, no more elements will be published. * - * @note This Publisher can be reused for multiple Subscribers, - * each subscription will re-run the [[Stream]] from the beginning. + * @note This [[Publisher]] can be reused for multiple [[Subscribers]], + * each [[Subscription]] will re-run the [[Stream]] from the beginning. * + * @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]]. * @see [[subscribe]] for a simpler version that only requires a [[Subscriber]]. */ def toPublisher(implicit F: Async[F]): Resource[F, Publisher[A]] = @@ -86,6 +89,25 @@ object syntax { flow.subscribeStream(stream, subscriber) } + implicit final class StreamIOOps[A](private val stream: Stream[IO, A]) extends AnyVal { + + /** Creates a [[Publisher]] from a [[Stream]]. + * + * The stream is only ran when elements are requested. + * + * @note This [[Publisher]] can be reused for multiple [[Subscribers]], + * each [[Subscription]] will re-run the [[Stream]] from the beginning. + * + * @see [[toPublisher]] for a safe version that returns a [[Resource]]. + * + * @param stream The [[Stream]] to transform. + */ + def unsafeToPublisher()(implicit + runtime: IORuntime + ): Publisher[A] = + flow.unsafeToPublisher(stream) + } + final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal { def apply[A]( publisher: Publisher[A], From a9e0747c741a495acc8ce8ebeb3b039c7de9c2e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 12:59:45 -0500 Subject: [PATCH 05/10] Optimize StreamPublisher --- .../fs2/interop/flow/StreamPublisher.scala | 51 +++++++++++++------ 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala index 2208613bc2..9727b055fb 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala @@ -41,12 +41,14 @@ import scala.util.control.NoStackTrace * * @see [[https://github.com/reactive-streams/reactive-streams-jvm#1-publisher-code]] */ -private[flow] final class StreamPublisher[F[_], A] private ( - stream: Stream[F, A], - runSubscription: F[Unit] => Unit -)(implicit F: Async[F]) - extends Publisher[A] { - override def subscribe(subscriber: Subscriber[_ >: A]): Unit = { +private[flow] sealed abstract class StreamPublisher[F[_], A] private ( + stream: Stream[F, A] +)(implicit + F: Async[F] +) extends Publisher[A] { + protected def runSubscription(subscribe: F[Unit]): Unit + + override final def subscribe(subscriber: Subscriber[_ >: A]): Unit = { requireNonNull( subscriber, "The subscriber provided to subscribe must not be null" @@ -67,23 +69,40 @@ private[flow] final class StreamPublisher[F[_], A] private ( } private[flow] object StreamPublisher { + private final class DispatcherStreamPublisher[F[_], A]( + stream: Stream[F, A], + dispatcher: Dispatcher[F] + )(implicit + F: Async[F] + ) extends StreamPublisher[F, A](stream) { + override protected final def runSubscription(subscribe: F[Unit]): Unit = + dispatcher.unsafeRunAndForget(subscribe) + } + + private final class IORuntimeStreamPublisher[A]( + stream: Stream[IO, A] + )(implicit + runtime: IORuntime + ) extends StreamPublisher[IO, A](stream) { + override protected final def runSubscription(subscribe: IO[Unit]): Unit = + subscribe.unsafeRunAndForget() + } + def apply[F[_], A]( stream: Stream[F, A] - )(implicit F: Async[F]): Resource[F, StreamPublisher[F, A]] = + )(implicit + F: Async[F] + ): Resource[F, StreamPublisher[F, A]] = Dispatcher.parallel[F](await = false).map { startDispatcher => - new StreamPublisher( - stream, - runSubscription = startDispatcher.unsafeRunAndForget - ) + new DispatcherStreamPublisher(stream, startDispatcher) } def unsafe[A]( stream: Stream[IO, A] - )(implicit runtime: IORuntime): StreamPublisher[IO, A] = - new StreamPublisher( - stream, - runSubscription = _.unsafeRunAndForget() - ) + )(implicit + runtime: IORuntime + ): StreamPublisher[IO, A] = + new IORuntimeStreamPublisher(stream) private object CanceledStreamPublisherException extends IllegalStateException( From 2f6249442492864e9bdf7a740026e1084a7722c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 13:06:18 -0500 Subject: [PATCH 06/10] Handle RejectedExecutionException inside StreamPublisher --- .../src/main/scala/fs2/interop/flow/StreamPublisher.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala index 9727b055fb..0abc5bb4d1 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala @@ -30,6 +30,7 @@ import cats.effect.unsafe.IORuntime import java.util.Objects.requireNonNull import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription} +import java.util.concurrent.RejectedExecutionException import scala.util.control.NoStackTrace /** Implementation of a [[Publisher]]. @@ -58,7 +59,7 @@ private[flow] sealed abstract class StreamPublisher[F[_], A] private ( StreamSubscription.subscribe(stream, subscriber) ) catch { - case _: IllegalStateException => + case _: IllegalStateException | _: RejectedExecutionException => subscriber.onSubscribe(new Subscription { override def cancel(): Unit = () override def request(x$1: Long): Unit = () From f326b98372ff5e049835dc4b56707c02e15a82d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 13:27:25 -0500 Subject: [PATCH 07/10] Fix flow.syntax Scaladoc --- core/shared/src/main/scala/fs2/interop/flow/syntax.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/shared/src/main/scala/fs2/interop/flow/syntax.scala b/core/shared/src/main/scala/fs2/interop/flow/syntax.scala index eaa5796f70..1833b0c099 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/syntax.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/syntax.scala @@ -99,8 +99,6 @@ object syntax { * each [[Subscription]] will re-run the [[Stream]] from the beginning. * * @see [[toPublisher]] for a safe version that returns a [[Resource]]. - * - * @param stream The [[Stream]] to transform. */ def unsafeToPublisher()(implicit runtime: IORuntime From 3f2cab82c85b72799c0e9b6a3c13e6fc86d31a5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Wed, 22 Nov 2023 19:06:56 -0500 Subject: [PATCH 08/10] Minor improvements --- .../fs2/interop/flow/StreamSubscriber.scala | 28 +++++++++---------- .../fs2/interop/flow/StreamSubscription.scala | 8 +++--- .../main/scala/fs2/interop/flow/package.scala | 4 +-- .../main/scala/fs2/interop/flow/syntax.scala | 4 ++- 4 files changed, 23 insertions(+), 21 deletions(-) diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala index 39f5bf0caa..f0a4dfb588 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala @@ -50,35 +50,35 @@ private[flow] final class StreamSubscriber[F[_], A] private ( // Subscriber API. /** Receives a subscription from the upstream reactive-streams system. */ - override def onSubscribe(subscription: Subscription): Unit = { + override final def onSubscribe(subscription: Subscription): Unit = { requireNonNull( subscription, "The subscription provided to onSubscribe must not be null" ) - nextState(Subscribe(subscription)) + nextState(input = Subscribe(subscription)) } /** Receives the next record from the upstream reactive-streams system. */ - override def onNext(a: A): Unit = { + override final def onNext(a: A): Unit = { requireNonNull( a, "The element provided to onNext must not be null" ) - nextState(Next(a)) + nextState(input = Next(a)) } /** Called by the upstream reactive-streams system when it fails. */ - override def onError(ex: Throwable): Unit = { + override final def onError(ex: Throwable): Unit = { requireNonNull( ex, "The throwable provided to onError must not be null" ) - nextState(Error(ex)) + nextState(input = Error(ex)) } /** Called by the upstream reactive-streams system when it has finished sending records. */ - override def onComplete(): Unit = - nextState(Complete(canceled = false)) + override final def onComplete(): Unit = + nextState(input = Complete(canceled = false)) // Interop API. @@ -86,13 +86,13 @@ private[flow] final class StreamSubscriber[F[_], A] private ( private[flow] def stream(subscribe: F[Unit]): Stream[F, A] = { // Called when downstream has finished consuming records. val finalize = - F.delay(nextState(Complete(canceled = true))) + F.delay(nextState(input = Complete(canceled = true))) // Producer for downstream. val dequeue1 = F.async[Option[Chunk[Any]]] { cb => F.delay { - nextState(Dequeue(cb)) + nextState(input = Dequeue(cb)) Some(finalize) } @@ -112,8 +112,8 @@ private[flow] final class StreamSubscriber[F[_], A] private ( private def run(block: => Unit): () => Unit = () => block /** Runs a single step of the state machine. */ - private def step(in: Input): State => (State, () => Unit) = - in match { + private def step(input: Input): State => (State, () => Unit) = + input match { case Subscribe(s) => { case Uninitialized(None) => Idle(s) -> noop @@ -263,9 +263,9 @@ private[flow] final class StreamSubscriber[F[_], A] private ( * + `Error` & `Dequeue`: No matter the order in which they are processed, we will complete the callback with the error. * + cancellation & any other thing: Worst case, we will lose some data that we not longer care about; and eventually reach `Terminal`. */ - private def nextState(in: Input): Unit = { + private def nextState(input: Input): Unit = { val (_, effect) = currentState.updateAndGet { case (state, _) => - step(in)(state) + step(input)(state) } // Only run the effect after the state update took place. effect() diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala index 3129ea2d96..f482559764 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala @@ -33,7 +33,7 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference} /** Implementation of a [[Subscription]]. * - * This is used by the [[StreamUnicastPublisher]] to send elements from a [[Stream]] to a downstream reactive-streams system. + * This is used by the [[StreamPublisher]] to send elements from a [[Stream]] to a downstream reactive-streams system. * * @see [[https://github.com/reactive-streams/reactive-streams-jvm#3-subscription-code]] */ @@ -58,7 +58,7 @@ private[flow] final class StreamSubscription[F[_], A] private ( sub.onComplete() } - private[flow] def run: F[Unit] = { + val run: F[Unit] = { val subscriptionPipe: Pipe[F, A, A] = in => { def go(s: Stream[F, A]): Pull[F, A, Unit] = Pull.eval(F.delay(requests.get())).flatMap { n => @@ -133,14 +133,14 @@ private[flow] final class StreamSubscription[F[_], A] private ( // then the request must be a NOOP. // See https://github.com/zainab-ali/fs2-reactive-streams/issues/29 // and https://github.com/zainab-ali/fs2-reactive-streams/issues/46 - override def cancel(): Unit = { + override final def cancel(): Unit = { val cancelCB = canceled.getAndSet(null) if (cancelCB ne null) { cancelCB.apply() } } - override def request(n: Long): Unit = + override final def request(n: Long): Unit = // First, confirm we are not yet cancelled. if (canceled.get() ne null) { // Second, ensure we were requested a positive number of elements. diff --git a/core/shared/src/main/scala/fs2/interop/flow/package.scala b/core/shared/src/main/scala/fs2/interop/flow/package.scala index 64020b1c7f..de49e640c0 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/package.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/package.scala @@ -102,7 +102,7 @@ package object flow { subscriber.stream(subscribe(subscriber)) } - /** Creates a [[Stream]] from an [[Publisher]]. + /** Creates a [[Stream]] from a [[Publisher]]. * * @example {{{ * scala> import cats.effect.IO @@ -118,7 +118,7 @@ package object flow { * res0: Stream[IO, Int] = Stream(..) * }}} * - * @note The publisher will not receive a subscriber until the stream is run. + * @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run. * * @see the `toStream` extension method added to `Publisher` * diff --git a/core/shared/src/main/scala/fs2/interop/flow/syntax.scala b/core/shared/src/main/scala/fs2/interop/flow/syntax.scala index 1833b0c099..72513d93a5 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/syntax.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/syntax.scala @@ -32,7 +32,7 @@ import java.util.concurrent.Flow.{Publisher, Subscriber} object syntax { implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal { - /** Creates a [[Stream]] from an [[Publisher]]. + /** Creates a [[Stream]] from a [[Publisher]]. * * @example {{{ * scala> import cats.effect.IO @@ -49,6 +49,8 @@ object syntax { * res0: Stream[IO, Int] = Stream(..) * }}} * + * @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run. + * * @param chunkSize setup the number of elements asked each time from the [[Publisher]]. * A high number may be useful if the publisher is triggering from IO, * like requesting elements from a database. From 5c766b903c33c60464cced06060c2c1b148d146f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Fri, 24 Nov 2023 10:37:16 -0500 Subject: [PATCH 09/10] Make StreamSubscription.run a def (again) --- .../src/main/scala/fs2/interop/flow/StreamSubscription.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala index f482559764..9063f4fe3d 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala @@ -58,7 +58,9 @@ private[flow] final class StreamSubscription[F[_], A] private ( sub.onComplete() } - val run: F[Unit] = { + // This is a def rather than a val, because it is only used once. + // And having fields increase the instantiation cost and delay garbage collection. + def run: F[Unit] = { val subscriptionPipe: Pipe[F, A, A] = in => { def go(s: Stream[F, A]): Pull[F, A, Unit] = Pull.eval(F.delay(requests.get())).flatMap { n => From b5e91af393903583ba025121277d349e2f0d794e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Miguel=20Mej=C3=ADa=20Su=C3=A1rez?= Date: Fri, 24 Nov 2023 11:05:10 -0500 Subject: [PATCH 10/10] Wait for Dispatcher in StreamPublisher --- .../src/main/scala/fs2/interop/flow/StreamPublisher.scala | 6 +++--- core/shared/src/main/scala/fs2/interop/flow/package.scala | 4 +++- core/shared/src/main/scala/fs2/interop/flow/syntax.scala | 4 +++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala index 0abc5bb4d1..b20b8d383d 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala @@ -72,12 +72,12 @@ private[flow] sealed abstract class StreamPublisher[F[_], A] private ( private[flow] object StreamPublisher { private final class DispatcherStreamPublisher[F[_], A]( stream: Stream[F, A], - dispatcher: Dispatcher[F] + startDispatcher: Dispatcher[F] )(implicit F: Async[F] ) extends StreamPublisher[F, A](stream) { override protected final def runSubscription(subscribe: F[Unit]): Unit = - dispatcher.unsafeRunAndForget(subscribe) + startDispatcher.unsafeRunAndForget(subscribe) } private final class IORuntimeStreamPublisher[A]( @@ -94,7 +94,7 @@ private[flow] object StreamPublisher { )(implicit F: Async[F] ): Resource[F, StreamPublisher[F, A]] = - Dispatcher.parallel[F](await = false).map { startDispatcher => + Dispatcher.parallel[F](await = true).map { startDispatcher => new DispatcherStreamPublisher(stream, startDispatcher) } diff --git a/core/shared/src/main/scala/fs2/interop/flow/package.scala b/core/shared/src/main/scala/fs2/interop/flow/package.scala index de49e640c0..25eba9329f 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/package.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/package.scala @@ -136,7 +136,9 @@ package object flow { /** Creates a [[Publisher]] from a [[Stream]]. * * The stream is only ran when elements are requested. - * Closing the [[Resource]] means gracefully shutting down all active subscriptions. + * Closing the [[Resource]] means not accepting new subscriptions, + * but waiting for all active ones to finish consuming. + * Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions. * Thus, no more elements will be published. * * @note This [[Publisher]] can be reused for multiple [[Subscribers]], diff --git a/core/shared/src/main/scala/fs2/interop/flow/syntax.scala b/core/shared/src/main/scala/fs2/interop/flow/syntax.scala index 72513d93a5..5c6ee97004 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/syntax.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/syntax.scala @@ -67,7 +67,9 @@ object syntax { /** Creates a [[Publisher]] from a [[Stream]]. * * The stream is only ran when elements are requested. - * Closing the [[Resource]] means gracefully shutting down all active subscriptions. + * Closing the [[Resource]] means not accepting new subscriptions, + * but waiting for all active ones to finish consuming. + * Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions. * Thus, no more elements will be published. * * @note This [[Publisher]] can be reused for multiple [[Subscribers]],