diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala index 3cc596f2a2..b20b8d383d 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala @@ -23,11 +23,14 @@ package fs2 package interop package flow +import cats.effect.IO import cats.effect.kernel.{Async, Resource} import cats.effect.std.Dispatcher +import cats.effect.unsafe.IORuntime import java.util.Objects.requireNonNull import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription} +import java.util.concurrent.RejectedExecutionException import scala.util.control.NoStackTrace /** Implementation of a [[Publisher]]. @@ -39,22 +42,24 @@ import scala.util.control.NoStackTrace * * @see [[https://github.com/reactive-streams/reactive-streams-jvm#1-publisher-code]] */ -private[flow] final class StreamPublisher[F[_], A] private ( - stream: Stream[F, A], - startDispatcher: Dispatcher[F] -)(implicit F: Async[F]) - extends Publisher[A] { - override def subscribe(subscriber: Subscriber[_ >: A]): Unit = { +private[flow] sealed abstract class StreamPublisher[F[_], A] private ( + stream: Stream[F, A] +)(implicit + F: Async[F] +) extends Publisher[A] { + protected def runSubscription(subscribe: F[Unit]): Unit + + override final def subscribe(subscriber: Subscriber[_ >: A]): Unit = { requireNonNull( subscriber, "The subscriber provided to subscribe must not be null" ) try - startDispatcher.unsafeRunAndForget( + runSubscription( StreamSubscription.subscribe(stream, subscriber) ) catch { - case _: IllegalStateException => + case _: IllegalStateException | _: RejectedExecutionException => subscriber.onSubscribe(new Subscription { override def cancel(): Unit = () override def request(x$1: Long): Unit = () @@ -65,13 +70,41 @@ private[flow] final class StreamPublisher[F[_], A] private ( } private[flow] object StreamPublisher { + private final class DispatcherStreamPublisher[F[_], A]( + stream: Stream[F, A], + startDispatcher: Dispatcher[F] + )(implicit + F: Async[F] + ) extends StreamPublisher[F, A](stream) { + override protected final def runSubscription(subscribe: F[Unit]): Unit = + startDispatcher.unsafeRunAndForget(subscribe) + } + + private final class IORuntimeStreamPublisher[A]( + stream: Stream[IO, A] + )(implicit + runtime: IORuntime + ) extends StreamPublisher[IO, A](stream) { + override protected final def runSubscription(subscribe: IO[Unit]): Unit = + subscribe.unsafeRunAndForget() + } + def apply[F[_], A]( stream: Stream[F, A] - )(implicit F: Async[F]): Resource[F, StreamPublisher[F, A]] = - Dispatcher.parallel[F](await = false).map { startDispatcher => - new StreamPublisher(stream, startDispatcher) + )(implicit + F: Async[F] + ): Resource[F, StreamPublisher[F, A]] = + Dispatcher.parallel[F](await = true).map { startDispatcher => + new DispatcherStreamPublisher(stream, startDispatcher) } + def unsafe[A]( + stream: Stream[IO, A] + )(implicit + runtime: IORuntime + ): StreamPublisher[IO, A] = + new IORuntimeStreamPublisher(stream) + private object CanceledStreamPublisherException extends IllegalStateException( "This StreamPublisher is not longer accepting subscribers" diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala index 39f5bf0caa..f0a4dfb588 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala @@ -50,35 +50,35 @@ private[flow] final class StreamSubscriber[F[_], A] private ( // Subscriber API. /** Receives a subscription from the upstream reactive-streams system. */ - override def onSubscribe(subscription: Subscription): Unit = { + override final def onSubscribe(subscription: Subscription): Unit = { requireNonNull( subscription, "The subscription provided to onSubscribe must not be null" ) - nextState(Subscribe(subscription)) + nextState(input = Subscribe(subscription)) } /** Receives the next record from the upstream reactive-streams system. */ - override def onNext(a: A): Unit = { + override final def onNext(a: A): Unit = { requireNonNull( a, "The element provided to onNext must not be null" ) - nextState(Next(a)) + nextState(input = Next(a)) } /** Called by the upstream reactive-streams system when it fails. */ - override def onError(ex: Throwable): Unit = { + override final def onError(ex: Throwable): Unit = { requireNonNull( ex, "The throwable provided to onError must not be null" ) - nextState(Error(ex)) + nextState(input = Error(ex)) } /** Called by the upstream reactive-streams system when it has finished sending records. */ - override def onComplete(): Unit = - nextState(Complete(canceled = false)) + override final def onComplete(): Unit = + nextState(input = Complete(canceled = false)) // Interop API. @@ -86,13 +86,13 @@ private[flow] final class StreamSubscriber[F[_], A] private ( private[flow] def stream(subscribe: F[Unit]): Stream[F, A] = { // Called when downstream has finished consuming records. val finalize = - F.delay(nextState(Complete(canceled = true))) + F.delay(nextState(input = Complete(canceled = true))) // Producer for downstream. val dequeue1 = F.async[Option[Chunk[Any]]] { cb => F.delay { - nextState(Dequeue(cb)) + nextState(input = Dequeue(cb)) Some(finalize) } @@ -112,8 +112,8 @@ private[flow] final class StreamSubscriber[F[_], A] private ( private def run(block: => Unit): () => Unit = () => block /** Runs a single step of the state machine. */ - private def step(in: Input): State => (State, () => Unit) = - in match { + private def step(input: Input): State => (State, () => Unit) = + input match { case Subscribe(s) => { case Uninitialized(None) => Idle(s) -> noop @@ -263,9 +263,9 @@ private[flow] final class StreamSubscriber[F[_], A] private ( * + `Error` & `Dequeue`: No matter the order in which they are processed, we will complete the callback with the error. * + cancellation & any other thing: Worst case, we will lose some data that we not longer care about; and eventually reach `Terminal`. */ - private def nextState(in: Input): Unit = { + private def nextState(input: Input): Unit = { val (_, effect) = currentState.updateAndGet { case (state, _) => - step(in)(state) + step(input)(state) } // Only run the effect after the state update took place. effect() diff --git a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala index 3129ea2d96..9063f4fe3d 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/StreamSubscription.scala @@ -33,7 +33,7 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference} /** Implementation of a [[Subscription]]. * - * This is used by the [[StreamUnicastPublisher]] to send elements from a [[Stream]] to a downstream reactive-streams system. + * This is used by the [[StreamPublisher]] to send elements from a [[Stream]] to a downstream reactive-streams system. * * @see [[https://github.com/reactive-streams/reactive-streams-jvm#3-subscription-code]] */ @@ -58,7 +58,9 @@ private[flow] final class StreamSubscription[F[_], A] private ( sub.onComplete() } - private[flow] def run: F[Unit] = { + // This is a def rather than a val, because it is only used once. + // And having fields increase the instantiation cost and delay garbage collection. + def run: F[Unit] = { val subscriptionPipe: Pipe[F, A, A] = in => { def go(s: Stream[F, A]): Pull[F, A, Unit] = Pull.eval(F.delay(requests.get())).flatMap { n => @@ -133,14 +135,14 @@ private[flow] final class StreamSubscription[F[_], A] private ( // then the request must be a NOOP. // See https://github.com/zainab-ali/fs2-reactive-streams/issues/29 // and https://github.com/zainab-ali/fs2-reactive-streams/issues/46 - override def cancel(): Unit = { + override final def cancel(): Unit = { val cancelCB = canceled.getAndSet(null) if (cancelCB ne null) { cancelCB.apply() } } - override def request(n: Long): Unit = + override final def request(n: Long): Unit = // First, confirm we are not yet cancelled. if (canceled.get() ne null) { // Second, ensure we were requested a positive number of elements. diff --git a/core/shared/src/main/scala/fs2/interop/flow/package.scala b/core/shared/src/main/scala/fs2/interop/flow/package.scala index 50a9d64f9c..25eba9329f 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/package.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/package.scala @@ -22,7 +22,9 @@ package fs2 package interop +import cats.effect.IO import cats.effect.kernel.{Async, Resource} +import cats.effect.unsafe.IORuntime import java.util.concurrent.Flow.{Publisher, Subscriber, defaultBufferSize} @@ -100,7 +102,7 @@ package object flow { subscriber.stream(subscribe(subscriber)) } - /** Creates a [[Stream]] from an [[Publisher]]. + /** Creates a [[Stream]] from a [[Publisher]]. * * @example {{{ * scala> import cats.effect.IO @@ -116,7 +118,7 @@ package object flow { * res0: Stream[IO, Int] = Stream(..) * }}} * - * @note The publisher will not receive a subscriber until the stream is run. + * @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run. * * @see the `toStream` extension method added to `Publisher` * @@ -134,12 +136,15 @@ package object flow { /** Creates a [[Publisher]] from a [[Stream]]. * * The stream is only ran when elements are requested. - * Closing the [[Resource]] means gracefully shutting down all active subscriptions. + * Closing the [[Resource]] means not accepting new subscriptions, + * but waiting for all active ones to finish consuming. + * Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions. * Thus, no more elements will be published. * - * @note This Publisher can be reused for multiple Subscribers, - * each subscription will re-run the [[Stream]] from the beginning. + * @note This [[Publisher]] can be reused for multiple [[Subscribers]], + * each [[Subscription]] will re-run the [[Stream]] from the beginning. * + * @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]]. * @see [[subscribeStream]] for a simpler version that only requires a [[Subscriber]]. * * @param stream The [[Stream]] to transform. @@ -151,6 +156,24 @@ package object flow { ): Resource[F, Publisher[A]] = StreamPublisher(stream) + /** Creates a [[Publisher]] from a [[Stream]]. + * + * The stream is only ran when elements are requested. + * + * @note This [[Publisher]] can be reused for multiple [[Subscribers]], + * each [[Subscription]] will re-run the [[Stream]] from the beginning. + * + * @see [[toPublisher]] for a safe version that returns a [[Resource]]. + * + * @param stream The [[Stream]] to transform. + */ + def unsafeToPublisher[A]( + stream: Stream[IO, A] + )(implicit + runtime: IORuntime + ): Publisher[A] = + StreamPublisher.unsafe(stream) + /** Allows subscribing a [[Subscriber]] to a [[Stream]]. * * The returned program will run until diff --git a/core/shared/src/main/scala/fs2/interop/flow/syntax.scala b/core/shared/src/main/scala/fs2/interop/flow/syntax.scala index 99fa0e23c7..5c6ee97004 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/syntax.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/syntax.scala @@ -23,14 +23,16 @@ package fs2 package interop package flow +import cats.effect.IO import cats.effect.kernel.{Async, Resource} +import cats.effect.unsafe.IORuntime import java.util.concurrent.Flow.{Publisher, Subscriber} object syntax { implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal { - /** Creates a [[Stream]] from an [[Publisher]]. + /** Creates a [[Stream]] from a [[Publisher]]. * * @example {{{ * scala> import cats.effect.IO @@ -47,6 +49,8 @@ object syntax { * res0: Stream[IO, Int] = Stream(..) * }}} * + * @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run. + * * @param chunkSize setup the number of elements asked each time from the [[Publisher]]. * A high number may be useful if the publisher is triggering from IO, * like requesting elements from a database. @@ -63,12 +67,15 @@ object syntax { /** Creates a [[Publisher]] from a [[Stream]]. * * The stream is only ran when elements are requested. - * Closing the [[Resource]] means gracefully shutting down all active subscriptions. + * Closing the [[Resource]] means not accepting new subscriptions, + * but waiting for all active ones to finish consuming. + * Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions. * Thus, no more elements will be published. * - * @note This Publisher can be reused for multiple Subscribers, - * each subscription will re-run the [[Stream]] from the beginning. + * @note This [[Publisher]] can be reused for multiple [[Subscribers]], + * each [[Subscription]] will re-run the [[Stream]] from the beginning. * + * @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]]. * @see [[subscribe]] for a simpler version that only requires a [[Subscriber]]. */ def toPublisher(implicit F: Async[F]): Resource[F, Publisher[A]] = @@ -86,6 +93,23 @@ object syntax { flow.subscribeStream(stream, subscriber) } + implicit final class StreamIOOps[A](private val stream: Stream[IO, A]) extends AnyVal { + + /** Creates a [[Publisher]] from a [[Stream]]. + * + * The stream is only ran when elements are requested. + * + * @note This [[Publisher]] can be reused for multiple [[Subscribers]], + * each [[Subscription]] will re-run the [[Stream]] from the beginning. + * + * @see [[toPublisher]] for a safe version that returns a [[Resource]]. + */ + def unsafeToPublisher()(implicit + runtime: IORuntime + ): Publisher[A] = + flow.unsafeToPublisher(stream) + } + final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal { def apply[A]( publisher: Publisher[A],