Skip to content

Commit

Permalink
Merge pull request #3322 from armanbilge/topic/js-stream-destroy
Browse files Browse the repository at this point in the history
Implement Node.js stream `destroy` w/ `interruptWhen`
  • Loading branch information
mpilquist authored Oct 27, 2023
2 parents e1e69e7 + 2bd4506 commit 62d25fa
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 27 deletions.
1 change: 1 addition & 0 deletions io/js/src/main/scala/fs2/io/NodeStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ trait Duplex extends Readable with Writable {
protected[io] override def destroy(): this.type = js.native
}

@deprecated("No longer raised", "3.9.3")
final class StreamDestroyedException private[io] () extends IOException
52 changes: 25 additions & 27 deletions io/js/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,30 +109,30 @@ private[fs2] trait ioplatform {
/** `Pipe` that converts a stream of bytes to a stream that will emit a single `Readable`,
* that ends whenever the resulting stream terminates.
*/
def toReadable[F[_]](implicit F: Async[F]): Pipe[F, Byte, Readable] =
in =>
Stream
.resource(mkDuplex(in))
.flatMap { case (duplex, out) =>
Stream
.emit(duplex)
.merge(out.drain)
.concurrently(
Stream.eval(
F.async_[Unit](cb =>
duplex.end { e =>
cb(e.filterNot(_ == null).toLeft(()).leftMap(js.JavaScriptException))
}
)
)
)
}
.adaptError { case IOException(ex) => ex }
def toReadable[F[_]: Async]: Pipe[F, Byte, Readable] =
in => Stream.resource(toReadableResource(in))

/** Like [[toReadable]] but returns a `Resource` rather than a single element stream.
*/
def toReadableResource[F[_]: Async](s: Stream[F, Byte]): Resource[F, Readable] =
s.through(toReadable).compile.resource.lastOrError
def toReadableResource[F[_]](s: Stream[F, Byte])(implicit F: Async[F]): Resource[F, Readable] =
mkDuplex(s)
.flatMap { case (duplex, out) =>
out
.concurrently(
Stream.eval(
F.async_[Unit](cb =>
duplex.end { e =>
cb(e.filterNot(_ == null).toLeft(()).leftMap(js.JavaScriptException))
}
)
)
)
.compile
.drain
.background
.as(duplex)
}
.adaptError { case IOException(ex) => ex }

/** Writes all bytes to the specified `Writable`.
*/
Expand Down Expand Up @@ -206,7 +206,7 @@ private[fs2] trait ioplatform {
errorDispatcher <- Dispatcher.sequential[F]
readQueue <- Queue.bounded[F, Option[Chunk[Byte]]](1).toResource
writeChannel <- Channel.synchronous[F, Chunk[Byte]].toResource
error <- F.deferred[Throwable].toResource
interrupt <- F.deferred[Either[Throwable, Unit]].toResource
duplex <- Resource.make {
F.delay {
new facade.stream.Duplex(
Expand Down Expand Up @@ -236,10 +236,9 @@ private[fs2] trait ioplatform {

var destroy = { (_, err, cb) =>
errorDispatcher.unsafeRunAndForget {
error
interrupt
.complete(
Option(err)
.fold[Exception](new StreamDestroyedException)(js.JavaScriptException(_))
Option(err).map(js.JavaScriptException(_)).toLeft(())
) *> F.delay(cb(null))
}
}
Expand All @@ -254,10 +253,9 @@ private[fs2] trait ioplatform {
}
drainIn = in.enqueueNoneTerminatedChunks(readQueue).drain
out = writeChannel.stream.unchunks
.concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit])))
} yield (
duplex,
drainIn.merge(out).adaptError { case IOException(ex) => ex }
drainIn.merge(out).interruptWhen(interrupt).adaptError { case IOException(ex) => ex }
)

/** Stream of bytes read asynchronously from standard input. */
Expand Down
12 changes: 12 additions & 0 deletions io/js/src/test/scala/fs2/io/IoPlatformSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,16 @@ class IoPlatformSuite extends Fs2Suite {
.timeoutTo(100.millis, IO.unit)
}

test("Destroying Node.js stream without error does not raise an exception") {
Stream
.never[IO]
.through {
toDuplexAndRead[IO] { duplex =>
IO(duplex.destroy())
}
}
.compile
.drain
}

}

0 comments on commit 62d25fa

Please sign in to comment.