diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 25c5f08f1b..ee20ff91cb 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -637,7 +637,7 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] { def unsafeRunAsync(cb: Either[Throwable, A] => Unit)( implicit runtime: unsafe.IORuntime): Unit = { unsafeRunFiber( - cb(Left(new CancellationException("The fiber was canceled"))), + cb(Left(new CancellationException("Main fiber was canceled"))), t => cb(Left(t)), a => cb(Right(a))) () @@ -676,46 +676,17 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] { * * @see [[IO.fromFuture]] */ - def unsafeToFuture()(implicit runtime: unsafe.IORuntime): Future[A] = - unsafeToFutureCancelable()._1 - - /** - * Evaluates the effect and produces the result in a `Future`, along with a - * cancelation token that can be used to cancel the original effect. - * - * This is similar to `unsafeRunAsync` in that it evaluates the `IO` - * as a side effect in a non-blocking fashion, but uses a `Future` - * rather than an explicit callback. This function should really - * only be used if interoperating with code which uses Scala futures. - * - * @see [[IO.fromFuture]] - */ - def unsafeToFutureCancelable()( - implicit runtime: unsafe.IORuntime): (Future[A], () => Future[Unit]) = { + def unsafeToFuture()(implicit runtime: unsafe.IORuntime): Future[A] = { val p = Promise[A]() - val fiber = unsafeRunFiber( - p.failure(new CancellationException("The fiber was canceled")), - p.failure, - p.success) + unsafeRunAsync { + case Left(t) => p.failure(t) + case Right(a) => p.success(a) + } - (p.future, () => fiber.cancel.unsafeToFuture()) + p.future } - /** - * Evaluates the effect, returning a cancelation token that can be used to - * cancel it. - * - * This is similar to `unsafeRunAsync` in that it evaluates the `IO` - * as a side effect in a non-blocking fashion, but uses a `Future` - * rather than an explicit callback. This function should really - * only be used if interoperating with code which uses Scala futures. - * - * @see [[IO.fromFuture]] - */ - def unsafeRunCancelable()(implicit runtime: unsafe.IORuntime): () => Future[Unit] = - unsafeToFutureCancelable()._2 - private[effect] def unsafeRunFiber( canceled: => Unit, failure: Throwable => Unit, diff --git a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala index 740edc5083..9ee0a5fb5b 100644 --- a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala +++ b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala @@ -17,7 +17,6 @@ package cats.effect import cats.syntax.all._ -import cats.effect.std.Semaphore import org.scalacheck.Prop.forAll @@ -27,7 +26,7 @@ import org.specs2.mutable.Specification import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -import java.util.concurrent.{CancellationException, CountDownLatch, Executors} +import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.CompletableFuture abstract class IOPlatformSpecification extends Specification with ScalaCheck with Runners { @@ -216,46 +215,6 @@ abstract class IOPlatformSpecification extends Specification with ScalaCheck wit op must completeAs(true) } - "cancel all inner effects when canceled" in ticked { implicit ticker => - val deadlock = for { - gate1 <- Semaphore[IO](2) - _ <- gate1.acquireN(2) - - gate2 <- Semaphore[IO](2) - _ <- gate2.acquireN(2) - - io = IO { - // these finalizers never return, so this test is intentionally designed to hang - // they flip their gates first though; this is just testing that both run in parallel - val a = (gate1.release *> IO.never) onCancel { - gate2.release *> IO.never - } - - val b = (gate1.release *> IO.never) onCancel { - gate2.release *> IO.never - } - - a.unsafeRunAndForget() - b.unsafeRunAndForget() - } - - _ <- io.flatMap(_ => gate1.acquireN(2)).start - _ <- gate2.acquireN(2) // if both are not run in parallel, then this will hang - } yield () - - val test = for { - t <- IO(deadlock.unsafeToFutureCancelable()) - (f, ct) = t - _ <- IO.fromFuture(IO(ct())) - _ <- IO.blocking(scala.concurrent.Await.result(f, Duration.Inf)) - } yield () - - test.attempt.map { - case Left(t) => t.isInstanceOf[CancellationException] - case Right(_) => false - } must completeAs(true) - } - } } } diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index a672f32bae..5c5df4efdb 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -20,10 +20,10 @@ import cats.kernel.laws.discipline.MonoidTests import cats.laws.discipline.{AlignTests, SemigroupKTests} import cats.laws.discipline.arbitrary._ -import cats.effect.implicits._ import cats.effect.laws.AsyncTests import cats.effect.testkit.TestContext import cats.syntax.all._ +import cats.effect.implicits._ import org.scalacheck.Prop, Prop.forAll // import org.scalacheck.rng.Seed @@ -1094,61 +1094,6 @@ class IOSpec extends IOPlatformSpecification with Discipline with ScalaCheck wit } } - "run a synchronous IO" in ticked { implicit ticker => - val ioa = IO(1).map(_ + 2) - val test = IO.fromFuture(IO(ioa.unsafeToFuture())) - test must completeAs(3) - } - - "run an asynchronous IO" in ticked { implicit ticker => - val ioa = (IO(1) <* IO.cede).map(_ + 2) - val test = IO.fromFuture(IO(ioa.unsafeToFuture())) - test must completeAs(3) - } - - "run several IOs back to back" in ticked { implicit ticker => - var counter = 0 - val increment = IO { - counter += 1 - } - - val num = 10 - - val test = IO.fromFuture(IO(increment.unsafeToFuture())).replicateA(num).void - - test.flatMap(_ => IO(counter)) must completeAs(num) - } - - "run multiple IOs in parallel" in ticked { implicit ticker => - val num = 10 - - val test = for { - latches <- (0 until num).toList.traverse(_ => Deferred[IO, Unit]) - awaitAll = latches.parTraverse_(_.get) - - // engineer a deadlock: all subjects must be run in parallel or this will hang - subjects = latches.map(latch => latch.complete(()) >> awaitAll) - - _ <- subjects.parTraverse_(act => IO(act.unsafeRunAndForget())) - } yield () - - test must completeAs(()) - } - - "forward cancelation onto the inner action" in ticked { implicit ticker => - var canceled = false - - val run = IO { - IO.never.onCancel(IO { canceled = true }).unsafeRunCancelable() - } - - val test = IO.defer { - run.flatMap(ct => IO.sleep(500.millis) >> IO.fromFuture(IO(ct()))) - } - - test.flatMap(_ => IO(canceled)) must completeAs(true) - } - } "temporal" should {