diff --git a/io/js/src/main/scala/fs2/io/NodeStream.scala b/io/js/src/main/scala/fs2/io/NodeStream.scala index 99213a666f..fc8c92277f 100644 --- a/io/js/src/main/scala/fs2/io/NodeStream.scala +++ b/io/js/src/main/scala/fs2/io/NodeStream.scala @@ -72,4 +72,5 @@ trait Duplex extends Readable with Writable { protected[io] override def destroy(): this.type = js.native } +@deprecated("No longer raised", "3.9.3") final class StreamDestroyedException private[io] () extends IOException diff --git a/io/js/src/main/scala/fs2/io/ioplatform.scala b/io/js/src/main/scala/fs2/io/ioplatform.scala index 652755a497..011f770764 100644 --- a/io/js/src/main/scala/fs2/io/ioplatform.scala +++ b/io/js/src/main/scala/fs2/io/ioplatform.scala @@ -109,30 +109,30 @@ private[fs2] trait ioplatform { /** `Pipe` that converts a stream of bytes to a stream that will emit a single `Readable`, * that ends whenever the resulting stream terminates. */ - def toReadable[F[_]](implicit F: Async[F]): Pipe[F, Byte, Readable] = - in => - Stream - .resource(mkDuplex(in)) - .flatMap { case (duplex, out) => - Stream - .emit(duplex) - .merge(out.drain) - .concurrently( - Stream.eval( - F.async_[Unit](cb => - duplex.end { e => - cb(e.filterNot(_ == null).toLeft(()).leftMap(js.JavaScriptException)) - } - ) - ) - ) - } - .adaptError { case IOException(ex) => ex } + def toReadable[F[_]: Async]: Pipe[F, Byte, Readable] = + in => Stream.resource(toReadableResource(in)) /** Like [[toReadable]] but returns a `Resource` rather than a single element stream. */ - def toReadableResource[F[_]: Async](s: Stream[F, Byte]): Resource[F, Readable] = - s.through(toReadable).compile.resource.lastOrError + def toReadableResource[F[_]](s: Stream[F, Byte])(implicit F: Async[F]): Resource[F, Readable] = + mkDuplex(s) + .flatMap { case (duplex, out) => + out + .concurrently( + Stream.eval( + F.async_[Unit](cb => + duplex.end { e => + cb(e.filterNot(_ == null).toLeft(()).leftMap(js.JavaScriptException)) + } + ) + ) + ) + .compile + .drain + .background + .as(duplex) + } + .adaptError { case IOException(ex) => ex } /** Writes all bytes to the specified `Writable`. */ @@ -206,7 +206,7 @@ private[fs2] trait ioplatform { errorDispatcher <- Dispatcher.sequential[F] readQueue <- Queue.bounded[F, Option[Chunk[Byte]]](1).toResource writeChannel <- Channel.synchronous[F, Chunk[Byte]].toResource - error <- F.deferred[Throwable].toResource + interrupt <- F.deferred[Either[Throwable, Unit]].toResource duplex <- Resource.make { F.delay { new facade.stream.Duplex( @@ -236,10 +236,9 @@ private[fs2] trait ioplatform { var destroy = { (_, err, cb) => errorDispatcher.unsafeRunAndForget { - error + interrupt .complete( - Option(err) - .fold[Exception](new StreamDestroyedException)(js.JavaScriptException(_)) + Option(err).map(js.JavaScriptException(_)).toLeft(()) ) *> F.delay(cb(null)) } } @@ -254,10 +253,9 @@ private[fs2] trait ioplatform { } drainIn = in.enqueueNoneTerminatedChunks(readQueue).drain out = writeChannel.stream.unchunks - .concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit]))) } yield ( duplex, - drainIn.merge(out).adaptError { case IOException(ex) => ex } + drainIn.merge(out).interruptWhen(interrupt).adaptError { case IOException(ex) => ex } ) /** Stream of bytes read asynchronously from standard input. */ diff --git a/io/js/src/test/scala/fs2/io/IoPlatformSuite.scala b/io/js/src/test/scala/fs2/io/IoPlatformSuite.scala index bf0826a7b9..7403c886a5 100644 --- a/io/js/src/test/scala/fs2/io/IoPlatformSuite.scala +++ b/io/js/src/test/scala/fs2/io/IoPlatformSuite.scala @@ -114,4 +114,16 @@ class IoPlatformSuite extends Fs2Suite { .timeoutTo(100.millis, IO.unit) } + test("Destroying Node.js stream without error does not raise an exception") { + Stream + .never[IO] + .through { + toDuplexAndRead[IO] { duplex => + IO(duplex.destroy()) + } + } + .compile + .drain + } + }