Skip to content

Commit

Permalink
Merge pull request #3846 from djspiewak/build/backport-stability
Browse files Browse the repository at this point in the history
Backport changes from minor branch
  • Loading branch information
djspiewak authored Sep 25, 2023
2 parents a6c5b15 + f436fe6 commit a11d81a
Show file tree
Hide file tree
Showing 29 changed files with 279 additions and 104 deletions.
6 changes: 6 additions & 0 deletions .cirrus.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
jvm_highcore_task:
only_if: $CIRRUS_TAG != '' || $CIRRUS_PR != ''
required_pr_labels: Cirrus JVM
container:
image: sbtscala/scala-sbt:eclipse-temurin-jammy-17.0.5_8_1.9.0_3.3.0
cpu: 4
Expand All @@ -10,6 +12,8 @@ jvm_highcore_task:
script: sbt '++ 3' testsJVM/test ioAppTestsJVM/test

jvm_arm_highcore_task:
only_if: $CIRRUS_TAG != '' || $CIRRUS_PR != ''
required_pr_labels: Cirrus JVM
arm_container:
image: sbtscala/scala-sbt:eclipse-temurin-jammy-17.0.5_8_1.9.0_3.3.0
cpu: 4
Expand All @@ -21,6 +25,8 @@ jvm_arm_highcore_task:
script: sbt '++ 3' testsJVM/test ioAppTestsJVM/test

native_arm_task:
only_if: $CIRRUS_TAG != '' || $CIRRUS_PR != ''
required_pr_labels: Cirrus Native
arm_container:
dockerfile: .cirrus/Dockerfile
cpu: 2
Expand Down
15 changes: 15 additions & 0 deletions .github/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
changelog:
categories:
- title: Features
labels:
- :mushroom: enhancement
- title: Bug Fixes
labels:
- :beetle: bug
- title: Behind the Scenes
labels:
- :gear: infrastructure
- :robot:
- title: Documentation
labels:
- :books: docs
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

## Getting Started

- Wired: **3.4.11**
- Wired: **3.5.1**
- Tired: **2.5.5** (end of life)

```scala
Expand Down
8 changes: 8 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,14 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.IOLocal.scope"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.IOFiberConstants.ContStateResult"),
// #3775, changes to internal timers APIs
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"cats.effect.unsafe.TimerSkipList.insert"),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"cats.effect.unsafe.WorkerThread.sleep"),
// #3787, internal utility that was no longer needed
ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk$"),
// introduced by #3332, polling system
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.unsafe.IORuntimeBuilder.this"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[effect] sealed abstract class WorkStealingThreadPool[P] private ()
private[effect] def reschedule(runnable: Runnable): Unit
private[effect] def sleepInternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Runnable
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable
private[effect] def sleep(
delay: FiniteDuration,
task: Runnable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,11 @@ private[effect] final class BatchingMacrotaskExecutor(
var i = 0
while (i < batchSize && !fibers.isEmpty()) {
val fiber = fibers.take()

if (LinkingInfo.developmentMode)
if (fiberBag ne null)
fiberBag -= fiber

try fiber.run()
catch {
case t if NonFatal(t) => reportFailure(t)
case t: Throwable => IOFiber.onFatalFailure(t)
}

i += 1
}

Expand All @@ -100,10 +94,6 @@ private[effect] final class BatchingMacrotaskExecutor(
* batch.
*/
def schedule(fiber: IOFiber[_]): Unit = {
if (LinkingInfo.developmentMode)
if (fiberBag ne null)
fiberBag += fiber

fibers.offer(fiber)

if (needsReschedule) {
Expand All @@ -117,8 +107,12 @@ private[effect] final class BatchingMacrotaskExecutor(

def reportFailure(t: Throwable): Unit = reportFailure0(t)

def liveTraces(): Map[IOFiber[_], Trace] =
fiberBag.iterator.filterNot(_.isDone).map(f => f -> f.captureTrace()).toMap
def liveTraces(): Map[IOFiber[_], Trace] = {
val traces = Map.newBuilder[IOFiber[_], Trace]
fibers.foreach(f => if (!f.isDone) traces += f -> f.captureTrace())
fiberBag.foreach(f => if (!f.isDone) traces += f -> f.captureTrace())
traces.result()
}

@inline private[this] def monitor(runnable: Runnable): Runnable =
if (LinkingInfo.developmentMode)
Expand Down
21 changes: 21 additions & 0 deletions core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,25 @@ private final class JSArrayQueue[A] {
}
}

@inline def foreach(f: A => Unit): Unit =
if (empty) ()
else if (startIndex < endIndex) { // consecutive in middle of buffer
var i = startIndex
while (i < endIndex) {
f(buffer(i))
i += 1
}
} else { // split across tail and init of buffer
var i = startIndex
while (i < buffer.length) {
f(buffer(i))
i += 1
}
i = 0
while (i < endIndex) {
f(buffer(i))
i += 1
}
}

}
52 changes: 47 additions & 5 deletions core/jvm/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,26 @@ trait IOApp {
)

/**
* Configures the action to perform when unhandled errors are caught by the runtime. By
* default, this simply delegates to [[cats.effect.std.Console!.printStackTrace]]. It is safe
* to perform any `IO` action within this handler; it will not block the progress of the
* runtime. With that said, some care should be taken to avoid raising unhandled errors as a
* result of handling unhandled errors, since that will result in the obvious chaos.
* Configures the action to perform when unhandled errors are caught by the runtime. An
* unhandled error is an error that is raised (and not handled) on a Fiber that nobody is
* joining.
*
* For example:
*
* {{{
* import scala.concurrent.duration._
* override def run: IO[Unit] = IO(throw new Exception("")).start *> IO.sleep(1.second)
* }}}
*
* In this case, the exception is raised on a Fiber with no listeners. Nobody would be
* notified about that error. Therefore it is unhandled, and it goes through the reportFailure
* mechanism.
*
* By default, `reportFailure` simply delegates to
* [[cats.effect.std.Console!.printStackTrace]]. It is safe to perform any `IO` action within
* this handler; it will not block the progress of the runtime. With that said, some care
* should be taken to avoid raising unhandled errors as a result of handling unhandled errors,
* since that will result in the obvious chaos.
*/
protected def reportFailure(err: Throwable): IO[Unit] =
Console[IO].printStackTrace(err)
Expand Down Expand Up @@ -317,6 +332,32 @@ trait IOApp {
protected def onCpuStarvationWarn(metrics: CpuStarvationWarningMetrics): IO[Unit] =
CpuStarvationCheck.logWarning(metrics)

/**
* Defines what to do when IOApp detects that `main` is being invoked on a `Thread` which
* isn't the main process thread. This condition can happen when we are running inside of an
* `sbt run` with `fork := false`
*/
private def onNonMainThreadDetected(): Unit = {
val shouldPrint =
Option(System.getProperty("cats.effect.warnOnNonMainThreadDetected"))
.map(_.equalsIgnoreCase("true"))
.getOrElse(true)
if (shouldPrint)
System
.err
.println(
"""|[WARNING] IOApp `main` is running on a thread other than the main thread.
|This may prevent correct resource cleanup after `main` completes.
|This condition could be caused by executing `run` in an interactive sbt session with `fork := false`.
|Set `Compile / run / fork := true` in this project to resolve this.
|
|To silence this warning set the system property:
|`-Dcats.effect.warnOnNonMainThreadDetected=false`.
|""".stripMargin
)
else ()
}

/**
* The entry point for your application. Will be called by the runtime when the process is
* started. If the underlying runtime supports it, any arguments passed to the process will be
Expand All @@ -336,6 +377,7 @@ trait IOApp {
final def main(args: Array[String]): Unit = {
// checked in openjdk 8-17; this attempts to detect when we're running under artificial environments, like sbt
val isForked = Thread.currentThread().getId() == 1
if (!isForked) onNonMainThreadDetected()

val installed = if (runtime == null) {
import unsafe.IORuntime
Expand Down
8 changes: 4 additions & 4 deletions core/jvm/src/main/scala/cats/effect/IOCompanionPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type =>
* Implements [[cats.effect.kernel.Sync.blocking]].
*/
def blocking[A](thunk: => A): IO[A] = {
val fn = Thunk.asFunction0(thunk)
val fn = () => thunk
Blocking(TypeBlocking, fn, Tracing.calculateTracingEvent(fn.getClass))
}

// this cannot be marked private[effect] because of static forwarders in Java
@deprecated("use interruptible / interruptibleMany instead", "3.3.0")
def interruptible[A](many: Boolean, thunk: => A): IO[A] = {
val fn = Thunk.asFunction0(thunk)
val fn = () => thunk
Blocking(
if (many) TypeInterruptibleMany else TypeInterruptibleOnce,
fn,
Expand All @@ -80,7 +80,7 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type =>
* Implements [[cats.effect.kernel.Sync.interruptible[A](thunk:=>A):*]]
*/
def interruptible[A](thunk: => A): IO[A] = {
val fn = Thunk.asFunction0(thunk)
val fn = () => thunk
Blocking(TypeInterruptibleOnce, fn, Tracing.calculateTracingEvent(fn.getClass))
}

Expand All @@ -104,7 +104,7 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type =>
* Implements [[cats.effect.kernel.Sync!.interruptibleMany]]
*/
def interruptibleMany[A](thunk: => A): IO[A] = {
val fn = Thunk.asFunction0(thunk)
val fn = () => thunk
Blocking(TypeInterruptibleMany, fn, Tracing.calculateTracingEvent(fn.getClass))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,21 @@ private final class TimerSkipList() extends AtomicLong(MARKER + 1L) { sequenceNu
cb: Callback,
next: Node
) extends TimerSkipListNodeBase[Callback, Node](cb, next)
with Function0[Unit]
with Runnable {

/**
* Cancels the timer
*/
final override def run(): Unit = {
final def apply(): Unit = {
// TODO: We could null the callback here directly,
// TODO: and the do the lookup after (for unlinking).
TimerSkipList.this.doRemove(triggerTime, sequenceNum)
()
}

final def run() = apply()

private[TimerSkipList] final def isMarker: Boolean = {
// note: a marker node also has `triggerTime == MARKER`,
// but that's also a valid trigger time, so we need
Expand Down Expand Up @@ -158,7 +161,7 @@ private final class TimerSkipList() extends AtomicLong(MARKER + 1L) { sequenceNu
delay: Long,
callback: Right[Nothing, Unit] => Unit,
tlr: ThreadLocalRandom
): Runnable = {
): Function0[Unit] with Runnable = {
require(delay >= 0L)
// we have to check for overflow:
val triggerTime = computeTriggerTime(now = now, delay = delay)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,9 @@ private[effect] final class WorkStealingThreadPool[P](
/**
* Tries to call the current worker's `sleep`, but falls back to `sleepExternal` if needed.
*/
def sleepInternal(delay: FiniteDuration, callback: Right[Nothing, Unit] => Unit): Runnable = {
def sleepInternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
val thread = Thread.currentThread()
if (thread.isInstanceOf[WorkerThread[_]]) {
val worker = thread.asInstanceOf[WorkerThread[P]]
Expand All @@ -664,7 +666,7 @@ private[effect] final class WorkStealingThreadPool[P](
*/
private[this] final def sleepExternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Runnable = {
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
val random = ThreadLocalRandom.current()
val idx = random.nextInt(threadCount)
val tsl = sleepers(idx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ private final class WorkerThread[P](
}
}

def sleep(delay: FiniteDuration, callback: Right[Nothing, Unit] => Unit): Runnable = {
def sleep(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
// take the opportunity to update the current time, just in case other timers can benefit
val _now = System.nanoTime()
now = _now
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
* Any exceptions thrown by the effect will be caught and sequenced into the `IO`.
*/
def delay[A](thunk: => A): IO[A] = {
val fn = Thunk.asFunction0(thunk)
val fn = () => thunk
Delay(fn, Tracing.calculateTracingEvent(fn))
}

Expand Down
18 changes: 12 additions & 6 deletions core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -926,13 +926,19 @@ private final class IOFiber[A](
IO {
val scheduler = runtime.scheduler

val cancel =
if (scheduler.isInstanceOf[WorkStealingThreadPool[_]])
scheduler.asInstanceOf[WorkStealingThreadPool[_]].sleepInternal(delay, cb)
else
scheduler.sleep(delay, () => cb(RightUnit))
val cancelIO =
if (scheduler.isInstanceOf[WorkStealingThreadPool[_]]) {
val cancel =
scheduler
.asInstanceOf[WorkStealingThreadPool[_]]
.sleepInternal(delay, cb)
IO.Delay(cancel, null)
} else {
val cancel = scheduler.sleep(delay, () => cb(RightUnit))
IO(cancel.run())
}

Some(IO(cancel.run()))
Some(cancelIO)
}
}
else IO.cede
Expand Down
29 changes: 0 additions & 29 deletions core/shared/src/main/scala/cats/effect/Thunk.scala

This file was deleted.

3 changes: 2 additions & 1 deletion docs/core/io-runtime-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ This can be done for example with the [EnvironmentPlugin for Webpack](https://we
| `cats.effect.detectBlockedThreads` <br/> N/A | `Boolean` (`false`) | Whether or not we should detect blocked threads. |
| `cats.effect.logNonDaemonThreadsOnExit` <br/> N/A | `Boolean` (`true`) | Whether or not we should check for non-daemon threads on JVM exit. |
| `cats.effect.logNonDaemonThreads.sleepIntervalMillis` <br/> N/A | `Long` (`10000L`) | Time to sleep between checking for presence of non-daemon threads. |
| `cats.effect.cancelation.check.threshold ` <br/> `CATS_EFFECT_CANCELATION_CHECK_THRESHOLD` | `Int` (`512`) | Configure how often cancellation is checked. By default, every 512 iterations of the run loop. |
| `cats.effect.warnOnNonMainThreadDetected` <br/> N/A | `Boolean` (`true`) | Print a warning message when IOApp `main` runs on a non-main thread |
| `cats.effect.cancelation.check.threshold` <br/> `CATS_EFFECT_CANCELATION_CHECK_THRESHOLD` | `Int` (`512`) | Configure how often cancellation is checked. By default, every 512 iterations of the run loop. |
| `cats.effect.auto.yield.threshold.multiplier` <br/> `CATS_EFFECT_AUTO_YIELD_THRESHOLD_MULTIPLIER` | `Int` (`2`) | `autoYieldThreshold = autoYieldThresholdMultiplier x cancelationCheckThreshold`. See [thread model](../thread-model.md). |
| `cats.effect.tracing.exceptions.enhanced` <br/> `CATS_EFFECT_TRACING_EXCEPTIONS_ENHANCED` | `Boolean` (`true`) | Augment the stack traces of caught exceptions to include frames from the asynchronous stack traces. See [tracing](../tracing.md). |
| `cats.effect.tracing.buffer.size` <br/> `CATS_EFFECT_TRACING_BUFFER_SIZE` | `Int` (`16`) | Number of stack frames retained in the tracing buffer. Will be rounded up to next power of two. |
Expand Down
2 changes: 1 addition & 1 deletion docs/thread-model.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ purely functional promise that can only be completed once.
trait Deferred[F[_], A] {
def get: F[A]

def complete(a: A): F[Unit]
def complete(a: A): F[Boolean]
}
```

Expand Down
Loading

0 comments on commit a11d81a

Please sign in to comment.