Skip to content

Commit

Permalink
Revert "Add IO#unsafeToFutureCancelable"
Browse files Browse the repository at this point in the history
  • Loading branch information
djspiewak authored Apr 10, 2021
1 parent a78d82e commit 6f89a4d
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 134 deletions.
43 changes: 7 additions & 36 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
def unsafeRunAsync(cb: Either[Throwable, A] => Unit)(
implicit runtime: unsafe.IORuntime): Unit = {
unsafeRunFiber(
cb(Left(new CancellationException("The fiber was canceled"))),
cb(Left(new CancellationException("Main fiber was canceled"))),
t => cb(Left(t)),
a => cb(Right(a)))
()
Expand Down Expand Up @@ -676,46 +676,17 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
*
* @see [[IO.fromFuture]]
*/
def unsafeToFuture()(implicit runtime: unsafe.IORuntime): Future[A] =
unsafeToFutureCancelable()._1

/**
* Evaluates the effect and produces the result in a `Future`, along with a
* cancelation token that can be used to cancel the original effect.
*
* This is similar to `unsafeRunAsync` in that it evaluates the `IO`
* as a side effect in a non-blocking fashion, but uses a `Future`
* rather than an explicit callback. This function should really
* only be used if interoperating with code which uses Scala futures.
*
* @see [[IO.fromFuture]]
*/
def unsafeToFutureCancelable()(
implicit runtime: unsafe.IORuntime): (Future[A], () => Future[Unit]) = {
def unsafeToFuture()(implicit runtime: unsafe.IORuntime): Future[A] = {
val p = Promise[A]()

val fiber = unsafeRunFiber(
p.failure(new CancellationException("The fiber was canceled")),
p.failure,
p.success)
unsafeRunAsync {
case Left(t) => p.failure(t)
case Right(a) => p.success(a)
}

(p.future, () => fiber.cancel.unsafeToFuture())
p.future
}

/**
* Evaluates the effect, returning a cancelation token that can be used to
* cancel it.
*
* This is similar to `unsafeRunAsync` in that it evaluates the `IO`
* as a side effect in a non-blocking fashion, but uses a `Future`
* rather than an explicit callback. This function should really
* only be used if interoperating with code which uses Scala futures.
*
* @see [[IO.fromFuture]]
*/
def unsafeRunCancelable()(implicit runtime: unsafe.IORuntime): () => Future[Unit] =
unsafeToFutureCancelable()._2

private[effect] def unsafeRunFiber(
canceled: => Unit,
failure: Throwable => Unit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package cats.effect

import cats.syntax.all._
import cats.effect.std.Semaphore

import org.scalacheck.Prop.forAll

Expand All @@ -27,7 +26,7 @@ import org.specs2.mutable.Specification
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

import java.util.concurrent.{CancellationException, CountDownLatch, Executors}
import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.CompletableFuture

abstract class IOPlatformSpecification extends Specification with ScalaCheck with Runners {
Expand Down Expand Up @@ -216,46 +215,6 @@ abstract class IOPlatformSpecification extends Specification with ScalaCheck wit
op must completeAs(true)
}

"cancel all inner effects when canceled" in ticked { implicit ticker =>
val deadlock = for {
gate1 <- Semaphore[IO](2)
_ <- gate1.acquireN(2)

gate2 <- Semaphore[IO](2)
_ <- gate2.acquireN(2)

io = IO {
// these finalizers never return, so this test is intentionally designed to hang
// they flip their gates first though; this is just testing that both run in parallel
val a = (gate1.release *> IO.never) onCancel {
gate2.release *> IO.never
}

val b = (gate1.release *> IO.never) onCancel {
gate2.release *> IO.never
}

a.unsafeRunAndForget()
b.unsafeRunAndForget()
}

_ <- io.flatMap(_ => gate1.acquireN(2)).start
_ <- gate2.acquireN(2) // if both are not run in parallel, then this will hang
} yield ()

val test = for {
t <- IO(deadlock.unsafeToFutureCancelable())
(f, ct) = t
_ <- IO.fromFuture(IO(ct()))
_ <- IO.blocking(scala.concurrent.Await.result(f, Duration.Inf))
} yield ()

test.attempt.map {
case Left(t) => t.isInstanceOf[CancellationException]
case Right(_) => false
} must completeAs(true)
}

}
}
}
57 changes: 1 addition & 56 deletions tests/shared/src/test/scala/cats/effect/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import cats.kernel.laws.discipline.MonoidTests
import cats.laws.discipline.{AlignTests, SemigroupKTests}
import cats.laws.discipline.arbitrary._

import cats.effect.implicits._
import cats.effect.laws.AsyncTests
import cats.effect.testkit.TestContext
import cats.syntax.all._
import cats.effect.implicits._

import org.scalacheck.Prop, Prop.forAll
// import org.scalacheck.rng.Seed
Expand Down Expand Up @@ -1094,61 +1094,6 @@ class IOSpec extends IOPlatformSpecification with Discipline with ScalaCheck wit
}
}

"run a synchronous IO" in ticked { implicit ticker =>
val ioa = IO(1).map(_ + 2)
val test = IO.fromFuture(IO(ioa.unsafeToFuture()))
test must completeAs(3)
}

"run an asynchronous IO" in ticked { implicit ticker =>
val ioa = (IO(1) <* IO.cede).map(_ + 2)
val test = IO.fromFuture(IO(ioa.unsafeToFuture()))
test must completeAs(3)
}

"run several IOs back to back" in ticked { implicit ticker =>
var counter = 0
val increment = IO {
counter += 1
}

val num = 10

val test = IO.fromFuture(IO(increment.unsafeToFuture())).replicateA(num).void

test.flatMap(_ => IO(counter)) must completeAs(num)
}

"run multiple IOs in parallel" in ticked { implicit ticker =>
val num = 10

val test = for {
latches <- (0 until num).toList.traverse(_ => Deferred[IO, Unit])
awaitAll = latches.parTraverse_(_.get)

// engineer a deadlock: all subjects must be run in parallel or this will hang
subjects = latches.map(latch => latch.complete(()) >> awaitAll)

_ <- subjects.parTraverse_(act => IO(act.unsafeRunAndForget()))
} yield ()

test must completeAs(())
}

"forward cancelation onto the inner action" in ticked { implicit ticker =>
var canceled = false

val run = IO {
IO.never.onCancel(IO { canceled = true }).unsafeRunCancelable()
}

val test = IO.defer {
run.flatMap(ct => IO.sleep(500.millis) >> IO.fromFuture(IO(ct())))
}

test.flatMap(_ => IO(canceled)) must completeAs(true)
}

}

"temporal" should {
Expand Down

0 comments on commit 6f89a4d

Please sign in to comment.