Skip to content

Commit

Permalink
Merge pull request #3342 from BalmungSan/unsafe-to-publisher
Browse files Browse the repository at this point in the history
Add flow.unsafeToPublisher
  • Loading branch information
armanbilge authored Nov 24, 2023
2 parents 53ba75c + b5e91af commit 00fb259
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 38 deletions.
55 changes: 44 additions & 11 deletions core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ package fs2
package interop
package flow

import cats.effect.IO
import cats.effect.kernel.{Async, Resource}
import cats.effect.std.Dispatcher
import cats.effect.unsafe.IORuntime

import java.util.Objects.requireNonNull
import java.util.concurrent.Flow.{Publisher, Subscriber, Subscription}
import java.util.concurrent.RejectedExecutionException
import scala.util.control.NoStackTrace

/** Implementation of a [[Publisher]].
Expand All @@ -39,22 +42,24 @@ import scala.util.control.NoStackTrace
*
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#1-publisher-code]]
*/
private[flow] final class StreamPublisher[F[_], A] private (
stream: Stream[F, A],
startDispatcher: Dispatcher[F]
)(implicit F: Async[F])
extends Publisher[A] {
override def subscribe(subscriber: Subscriber[_ >: A]): Unit = {
private[flow] sealed abstract class StreamPublisher[F[_], A] private (
stream: Stream[F, A]
)(implicit
F: Async[F]
) extends Publisher[A] {
protected def runSubscription(subscribe: F[Unit]): Unit

override final def subscribe(subscriber: Subscriber[_ >: A]): Unit = {
requireNonNull(
subscriber,
"The subscriber provided to subscribe must not be null"
)
try
startDispatcher.unsafeRunAndForget(
runSubscription(
StreamSubscription.subscribe(stream, subscriber)
)
catch {
case _: IllegalStateException =>
case _: IllegalStateException | _: RejectedExecutionException =>
subscriber.onSubscribe(new Subscription {
override def cancel(): Unit = ()
override def request(x$1: Long): Unit = ()
Expand All @@ -65,13 +70,41 @@ private[flow] final class StreamPublisher[F[_], A] private (
}

private[flow] object StreamPublisher {
private final class DispatcherStreamPublisher[F[_], A](
stream: Stream[F, A],
startDispatcher: Dispatcher[F]
)(implicit
F: Async[F]
) extends StreamPublisher[F, A](stream) {
override protected final def runSubscription(subscribe: F[Unit]): Unit =
startDispatcher.unsafeRunAndForget(subscribe)
}

private final class IORuntimeStreamPublisher[A](
stream: Stream[IO, A]
)(implicit
runtime: IORuntime
) extends StreamPublisher[IO, A](stream) {
override protected final def runSubscription(subscribe: IO[Unit]): Unit =
subscribe.unsafeRunAndForget()
}

def apply[F[_], A](
stream: Stream[F, A]
)(implicit F: Async[F]): Resource[F, StreamPublisher[F, A]] =
Dispatcher.parallel[F](await = false).map { startDispatcher =>
new StreamPublisher(stream, startDispatcher)
)(implicit
F: Async[F]
): Resource[F, StreamPublisher[F, A]] =
Dispatcher.parallel[F](await = true).map { startDispatcher =>
new DispatcherStreamPublisher(stream, startDispatcher)
}

def unsafe[A](
stream: Stream[IO, A]
)(implicit
runtime: IORuntime
): StreamPublisher[IO, A] =
new IORuntimeStreamPublisher(stream)

private object CanceledStreamPublisherException
extends IllegalStateException(
"This StreamPublisher is not longer accepting subscribers"
Expand Down
28 changes: 14 additions & 14 deletions core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,49 +50,49 @@ private[flow] final class StreamSubscriber[F[_], A] private (
// Subscriber API.

/** Receives a subscription from the upstream reactive-streams system. */
override def onSubscribe(subscription: Subscription): Unit = {
override final def onSubscribe(subscription: Subscription): Unit = {
requireNonNull(
subscription,
"The subscription provided to onSubscribe must not be null"
)
nextState(Subscribe(subscription))
nextState(input = Subscribe(subscription))
}

/** Receives the next record from the upstream reactive-streams system. */
override def onNext(a: A): Unit = {
override final def onNext(a: A): Unit = {
requireNonNull(
a,
"The element provided to onNext must not be null"
)
nextState(Next(a))
nextState(input = Next(a))
}

/** Called by the upstream reactive-streams system when it fails. */
override def onError(ex: Throwable): Unit = {
override final def onError(ex: Throwable): Unit = {
requireNonNull(
ex,
"The throwable provided to onError must not be null"
)
nextState(Error(ex))
nextState(input = Error(ex))
}

/** Called by the upstream reactive-streams system when it has finished sending records. */
override def onComplete(): Unit =
nextState(Complete(canceled = false))
override final def onComplete(): Unit =
nextState(input = Complete(canceled = false))

// Interop API.

/** Creates a downstream [[Stream]] from this [[Subscriber]]. */
private[flow] def stream(subscribe: F[Unit]): Stream[F, A] = {
// Called when downstream has finished consuming records.
val finalize =
F.delay(nextState(Complete(canceled = true)))
F.delay(nextState(input = Complete(canceled = true)))

// Producer for downstream.
val dequeue1 =
F.async[Option[Chunk[Any]]] { cb =>
F.delay {
nextState(Dequeue(cb))
nextState(input = Dequeue(cb))

Some(finalize)
}
Expand All @@ -112,8 +112,8 @@ private[flow] final class StreamSubscriber[F[_], A] private (
private def run(block: => Unit): () => Unit = () => block

/** Runs a single step of the state machine. */
private def step(in: Input): State => (State, () => Unit) =
in match {
private def step(input: Input): State => (State, () => Unit) =
input match {
case Subscribe(s) => {
case Uninitialized(None) =>
Idle(s) -> noop
Expand Down Expand Up @@ -263,9 +263,9 @@ private[flow] final class StreamSubscriber[F[_], A] private (
* + `Error` & `Dequeue`: No matter the order in which they are processed, we will complete the callback with the error.
* + cancellation & any other thing: Worst case, we will lose some data that we not longer care about; and eventually reach `Terminal`.
*/
private def nextState(in: Input): Unit = {
private def nextState(input: Input): Unit = {
val (_, effect) = currentState.updateAndGet { case (state, _) =>
step(in)(state)
step(input)(state)
}
// Only run the effect after the state update took place.
effect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}

/** Implementation of a [[Subscription]].
*
* This is used by the [[StreamUnicastPublisher]] to send elements from a [[Stream]] to a downstream reactive-streams system.
* This is used by the [[StreamPublisher]] to send elements from a [[Stream]] to a downstream reactive-streams system.
*
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#3-subscription-code]]
*/
Expand All @@ -58,7 +58,9 @@ private[flow] final class StreamSubscription[F[_], A] private (
sub.onComplete()
}

private[flow] def run: F[Unit] = {
// This is a def rather than a val, because it is only used once.
// And having fields increase the instantiation cost and delay garbage collection.
def run: F[Unit] = {
val subscriptionPipe: Pipe[F, A, A] = in => {
def go(s: Stream[F, A]): Pull[F, A, Unit] =
Pull.eval(F.delay(requests.get())).flatMap { n =>
Expand Down Expand Up @@ -133,14 +135,14 @@ private[flow] final class StreamSubscription[F[_], A] private (
// then the request must be a NOOP.
// See https://github.com/zainab-ali/fs2-reactive-streams/issues/29
// and https://github.com/zainab-ali/fs2-reactive-streams/issues/46
override def cancel(): Unit = {
override final def cancel(): Unit = {
val cancelCB = canceled.getAndSet(null)
if (cancelCB ne null) {
cancelCB.apply()
}
}

override def request(n: Long): Unit =
override final def request(n: Long): Unit =
// First, confirm we are not yet cancelled.
if (canceled.get() ne null) {
// Second, ensure we were requested a positive number of elements.
Expand Down
33 changes: 28 additions & 5 deletions core/shared/src/main/scala/fs2/interop/flow/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
package fs2
package interop

import cats.effect.IO
import cats.effect.kernel.{Async, Resource}
import cats.effect.unsafe.IORuntime

import java.util.concurrent.Flow.{Publisher, Subscriber, defaultBufferSize}

Expand Down Expand Up @@ -100,7 +102,7 @@ package object flow {
subscriber.stream(subscribe(subscriber))
}

/** Creates a [[Stream]] from an [[Publisher]].
/** Creates a [[Stream]] from a [[Publisher]].
*
* @example {{{
* scala> import cats.effect.IO
Expand All @@ -116,7 +118,7 @@ package object flow {
* res0: Stream[IO, Int] = Stream(..)
* }}}
*
* @note The publisher will not receive a subscriber until the stream is run.
* @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run.
*
* @see the `toStream` extension method added to `Publisher`
*
Expand All @@ -134,12 +136,15 @@ package object flow {
/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
* Closing the [[Resource]] means gracefully shutting down all active subscriptions.
* Closing the [[Resource]] means not accepting new subscriptions,
* but waiting for all active ones to finish consuming.
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
* Thus, no more elements will be published.
*
* @note This Publisher can be reused for multiple Subscribers,
* each subscription will re-run the [[Stream]] from the beginning.
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]].
* @see [[subscribeStream]] for a simpler version that only requires a [[Subscriber]].
*
* @param stream The [[Stream]] to transform.
Expand All @@ -151,6 +156,24 @@ package object flow {
): Resource[F, Publisher[A]] =
StreamPublisher(stream)

/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[toPublisher]] for a safe version that returns a [[Resource]].
*
* @param stream The [[Stream]] to transform.
*/
def unsafeToPublisher[A](
stream: Stream[IO, A]
)(implicit
runtime: IORuntime
): Publisher[A] =
StreamPublisher.unsafe(stream)

/** Allows subscribing a [[Subscriber]] to a [[Stream]].
*
* The returned program will run until
Expand Down
32 changes: 28 additions & 4 deletions core/shared/src/main/scala/fs2/interop/flow/syntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ package fs2
package interop
package flow

import cats.effect.IO
import cats.effect.kernel.{Async, Resource}
import cats.effect.unsafe.IORuntime

import java.util.concurrent.Flow.{Publisher, Subscriber}

object syntax {
implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal {

/** Creates a [[Stream]] from an [[Publisher]].
/** Creates a [[Stream]] from a [[Publisher]].
*
* @example {{{
* scala> import cats.effect.IO
Expand All @@ -47,6 +49,8 @@ object syntax {
* res0: Stream[IO, Int] = Stream(..)
* }}}
*
* @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run.
*
* @param chunkSize setup the number of elements asked each time from the [[Publisher]].
* A high number may be useful if the publisher is triggering from IO,
* like requesting elements from a database.
Expand All @@ -63,12 +67,15 @@ object syntax {
/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
* Closing the [[Resource]] means gracefully shutting down all active subscriptions.
* Closing the [[Resource]] means not accepting new subscriptions,
* but waiting for all active ones to finish consuming.
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
* Thus, no more elements will be published.
*
* @note This Publisher can be reused for multiple Subscribers,
* each subscription will re-run the [[Stream]] from the beginning.
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[unsafeToPublisher]] for an unsafe version that returns a plain [[Publisher]].
* @see [[subscribe]] for a simpler version that only requires a [[Subscriber]].
*/
def toPublisher(implicit F: Async[F]): Resource[F, Publisher[A]] =
Expand All @@ -86,6 +93,23 @@ object syntax {
flow.subscribeStream(stream, subscriber)
}

implicit final class StreamIOOps[A](private val stream: Stream[IO, A]) extends AnyVal {

/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
* each [[Subscription]] will re-run the [[Stream]] from the beginning.
*
* @see [[toPublisher]] for a safe version that returns a [[Resource]].
*/
def unsafeToPublisher()(implicit
runtime: IORuntime
): Publisher[A] =
flow.unsafeToPublisher(stream)
}

final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal {
def apply[A](
publisher: Publisher[A],
Expand Down

0 comments on commit 00fb259

Please sign in to comment.