Skip to content

Commit

Permalink
Wait for Dispatcher in StreamPublisher
Browse files Browse the repository at this point in the history
  • Loading branch information
BalmungSan committed Nov 24, 2023
1 parent 5c766b9 commit b5e91af
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ private[flow] sealed abstract class StreamPublisher[F[_], A] private (
private[flow] object StreamPublisher {
private final class DispatcherStreamPublisher[F[_], A](
stream: Stream[F, A],
dispatcher: Dispatcher[F]
startDispatcher: Dispatcher[F]
)(implicit
F: Async[F]
) extends StreamPublisher[F, A](stream) {
override protected final def runSubscription(subscribe: F[Unit]): Unit =
dispatcher.unsafeRunAndForget(subscribe)
startDispatcher.unsafeRunAndForget(subscribe)
}

private final class IORuntimeStreamPublisher[A](
Expand All @@ -94,7 +94,7 @@ private[flow] object StreamPublisher {
)(implicit
F: Async[F]
): Resource[F, StreamPublisher[F, A]] =
Dispatcher.parallel[F](await = false).map { startDispatcher =>
Dispatcher.parallel[F](await = true).map { startDispatcher =>
new DispatcherStreamPublisher(stream, startDispatcher)
}

Expand Down
4 changes: 3 additions & 1 deletion core/shared/src/main/scala/fs2/interop/flow/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ package object flow {
/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
* Closing the [[Resource]] means gracefully shutting down all active subscriptions.
* Closing the [[Resource]] means not accepting new subscriptions,
* but waiting for all active ones to finish consuming.
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
* Thus, no more elements will be published.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
Expand Down
4 changes: 3 additions & 1 deletion core/shared/src/main/scala/fs2/interop/flow/syntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ object syntax {
/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
* Closing the [[Resource]] means gracefully shutting down all active subscriptions.
* Closing the [[Resource]] means not accepting new subscriptions,
* but waiting for all active ones to finish consuming.
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
* Thus, no more elements will be published.
*
* @note This [[Publisher]] can be reused for multiple [[Subscribers]],
Expand Down

0 comments on commit b5e91af

Please sign in to comment.