Skip to content

Commit

Permalink
Merge from main
Browse files Browse the repository at this point in the history
  • Loading branch information
durban committed Feb 4, 2025
2 parents c298589 + 9188707 commit 9971791
Show file tree
Hide file tree
Showing 31 changed files with 540 additions and 348 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -454,5 +454,5 @@ jobs:
- name: Submit Dependencies
uses: scalacenter/sbt-dependency-submission@v2
with:
modules-ignore: choam-stress-data-slow_2.13 choam-stress-data-slow_3 choam_2.13 choam_3 choam-internal-helpers_2.13 choam-internal-helpers_3 choam-stress-mcas-slow_2.13 choam-stress-mcas-slow_3 choam-stress-experiments_2.13 choam-stress-experiments_3 choam-stress-linchk_2.13 choam-stress-linchk_3 choam-stress-mcas_2.13 choam-stress-mcas_3 choam-test-ext_2.13 choam-test-ext_3 choam-stress-linchk-agent_2.13 choam-stress-linchk-agent_3 choam-test-ext_sjs1_2.13 choam-test-ext_sjs1_3 choam-layout_2.13 choam-layout_3 choam-stress-old_2.13 choam-stress-old_3 choam-stress-core_2.13 choam-stress-core_3 choam-stress-data_2.13 choam-stress-data_3 choam-stress-async_2.13 choam-stress-async_3 choam-stress-rng_2.13 choam-stress-rng_3 choam-bench_2.13 choam-bench_3
modules-ignore: choam-stress-data-slow_2.13 choam-stress-data-slow_3 choam_2.13 choam_3 choam-internal-helpers_2.13 choam-internal-helpers_3 choam-stress-mcas-slow_2.13 choam-stress-mcas-slow_3 choam-stress-experiments_2.13 choam-stress-experiments_3 choam-stress-linchk_2.13 choam-stress-linchk_3 choam-stress-mcas_2.13 choam-stress-mcas_3 choam-test-ext_2.13 choam-test-ext_3 choam-test-ext_sjs1_2.13 choam-test-ext_sjs1_3 choam-layout_2.13 choam-layout_3 choam-stress-old_2.13 choam-stress-old_3 choam-stress-core_2.13 choam-stress-core_3 choam-stress-data_2.13 choam-stress-data_3 choam-stress-async_2.13 choam-stress-async_3 choam-stress-rng_2.13 choam-stress-rng_3 choam-bench_2.13 choam-bench_3
configs-ignore: test scala-tool scala-doc-tool test-internal
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ import cats.Parallel
import cats.effect.IO
import cats.effect.std.{ Queue => CatsQueue }

import _root_.dev.tauri.choam.bench.BenchUtils
import dev.tauri.choam.bench.BenchUtils
import ce.unsafeImplicits._
import internal.mcas.Mcas

@Fork(2)
@Threads(1) // because it runs on the CE threadpool
Expand Down Expand Up @@ -112,8 +111,8 @@ object BoundedQueueBench {
val catsQ: CatsQueue[IO, String] =
CatsQueue.bounded[IO, String](Bound).unsafeRunSync()(runtime)
val rxnLinkedQ: CatsQueue[IO, String] =
BoundedQueue.linked[IO, String](Bound).unsafeRun(Mcas.Emcas).toCats
BoundedQueue.linked[IO, String](Bound).unsafeRun(asyncReactiveForIO.mcasImpl).toCats
val rxnArrayQ: CatsQueue[IO, String] =
BoundedQueue.array[IO, String](Bound).unsafeRun(Mcas.Emcas).toCats
BoundedQueue.array[IO, String](Bound).unsafeRun(asyncReactiveForIO.mcasImpl).toCats
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import org.openjdk.jmh.infra.Blackhole
import cats.effect.IO
import cats.effect.std.{ Queue => CatsQueue }

import _root_.dev.tauri.choam.bench.BenchUtils
import dev.tauri.choam.bench.BenchUtils
import ce.unsafeImplicits._
import internal.mcas.Mcas

@Fork(2)
@Threads(1)
Expand Down Expand Up @@ -75,8 +74,8 @@ object RingBufferBench {
val catsQ: CatsQueue[IO, String] =
CatsQueue.circularBuffer[IO, String](Capacity).unsafeRunSync()(runtime)
val rxnQStrict: CatsQueue[IO, String] =
OverflowQueue.ringBuffer[IO, String](Capacity).unsafeRun(Mcas.Emcas).toCats
OverflowQueue.ringBuffer[IO, String](Capacity).unsafeRun(asyncReactiveForIO.mcasImpl).toCats
val rxnQLazy: CatsQueue[IO, String] =
OverflowQueue.lazyRingBuffer[IO, String](Capacity).unsafeRun(Mcas.Emcas).toCats
OverflowQueue.lazyRingBuffer[IO, String](Capacity).unsafeRun(asyncReactiveForIO.mcasImpl).toCats
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ import org.openjdk.jmh.annotations._
import cats.effect.IO
import fs2.concurrent.SignallingRef

import _root_.dev.tauri.choam.bench.BenchUtils
import dev.tauri.choam.bench.BenchUtils
import ce.unsafeImplicits._
import internal.mcas.Mcas

@Fork(2)
@Threads(1) // because it run on the CE compute pool
Expand Down Expand Up @@ -74,7 +73,7 @@ object SignallingRefBench {
val fs2Reset: IO[Unit] =
reset(fs2)
val rxn: SignallingRef[IO, String] =
stream.signallingRef[IO, String]("initial").unsafeRun(Mcas.Emcas)
stream.signallingRef[IO, String]("initial").unsafeRun(asyncReactiveForIO.mcasImpl)
val rxnReset: IO[Unit] =
reset(rxn)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package bench
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole

import _root_.dev.tauri.choam.bench.util.McasImplState
import dev.tauri.choam.bench.util.McasImplState

/**
* Resource allocation scenario, described in [Software transactional memory](
Expand Down
35 changes: 4 additions & 31 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.typesafe.tools.mima.core.{

// Scala versions:
val scala2 = "2.13.16"
val scala3 = "3.3.4"
val scala3 = "3.3.5"

// CI JVM versions:
val jvmOldest = JavaSpec.temurin("11")
Expand Down Expand Up @@ -262,7 +262,6 @@ lazy val choam = project.in(file("."))
stressAsync, // JVM
stressExperiments, // JVM
stressLinchk, // JVM
stressLinchkAgent, // JVM
stressRng, // JVM
layout, // JVM
)
Expand Down Expand Up @@ -356,6 +355,7 @@ lazy val mcas = crossProject(JVMPlatform, JSPlatform)
ProblemFilters.exclude[ReversedMissingMethodProblem]("dev.tauri.choam.internal.mcas.emcas.EmcasJmxStatsMBean.getMcasRetryStats"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("dev.tauri.choam.internal.mcas.emcas.EmcasJmxStatsMBean.getExchangerStats"),
ProblemFilters.exclude[DirectMissingMethodProblem]("dev.tauri.choam.internal.mcas.Mcas.internalEmcas"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("dev.tauri.choam.internal.mcas.AbstractDescriptor.validTsBoxed"),
),
)

Expand Down Expand Up @@ -691,37 +691,10 @@ lazy val stressLinchk = project.in(file("stress") / "stress-linchk")
.settings(commonSettingsJvm)
.disablePlugins(disabledPlugins: _*)
.enablePlugins(NoPublishPlugin)
.dependsOn(async.jvm % "compile->compile;test->test")
.dependsOn(stm.jvm % "compile->compile;test->test")
.settings(
libraryDependencies += dependencies.lincheck.value,
Test / fork := true, // otherwise the bytecode transformers won't work
Test / test := {
// we'll need the agent JAR to run the tests:
(stressLinchkAgent / Compile / packageBin).value.##
(Test / test).value
},
Test / testOnly := {
// we'll need the agent JAR to run the tests:
(stressLinchkAgent / Compile / packageBin).value.##
(Test / testOnly).evaluated
},
Test / javaOptions ++= List(
s"-javaagent:${(stressLinchkAgent / Compile / packageBin / artifactPath).value}",
// "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=127.0.0.1:8000",
),
)

lazy val stressLinchkAgent = project.in(file("stress") / "stress-linchk-agent")
.settings(name := "choam-stress-linchk-agent")
.settings(commonSettings)
.settings(commonSettingsJvm)
.disablePlugins(disabledPlugins: _*)
.enablePlugins(NoPublishPlugin)
.settings(
libraryDependencies += dependencies.asm.value,
packageOptions += Package.ManifestAttributes(
"Premain-Class" -> "dev.tauri.choam.lcagent.Premain",
),
)

lazy val stressRng = project.in(file("stress") / "stress-rng")
Expand Down Expand Up @@ -954,7 +927,7 @@ lazy val dependencies = new {
val jol = Def.setting("org.openjdk.jol" % "jol-core" % jolVersion)
val jmh = Def.setting("org.openjdk.jmh" % "jmh-core" % jmhVersion)
val jcTools = Def.setting("org.jctools" % "jctools-core" % "4.0.5") // https://github.com/JCTools/JCTools
val lincheck = Def.setting("org.jetbrains.kotlinx" % "lincheck-jvm" % "2.34") // https://github.com/JetBrains/lincheck
val lincheck = Def.setting("org.jetbrains.kotlinx" % "lincheck-jvm" % "2.35") // https://github.com/JetBrains/lincheck
val asm = Def.setting("org.ow2.asm" % "asm-commons" % "9.7.1") // https://asm.ow2.io/

// JS:
Expand Down
6 changes: 6 additions & 0 deletions ce/shared/src/main/scala/dev/tauri/choam/ce/BaseMixin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ import internal.mcas.{ Mcas, OsRng }
private[ce] trait BaseMixin { this: IOApp =>

private[this] final val __mcasImpl: Mcas = {
// This might block, but as we're in the
// constructor of an `IOApp`, it's probably
// not a big deal (there is likely other
// similar initialization going on anyway);
// also, we're sure there is no `Rxn`
// running yet.
val osRng = OsRng.mkNew()
Mcas.newDefaultMcas(osRng)
}
Expand Down
12 changes: 12 additions & 0 deletions ce/shared/src/main/scala/dev/tauri/choam/ce/RxnAppMixin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ import cats.effect.{ IO, IOApp }

import async.AsyncReactive

/**
* Mixin for convenient access to an `AsyncReactive[IO]`
*
* This trait is intended to be mixed into
* an object extending [[cats.effect.IOApp]].
* It provides an implicit `AsyncReactive[IO]`.
* instance. The resources needed by this
* instance are acquired in the constructor, and
* are never released. Thus, use only if the
* `AsyncReactive` is needed for the duration of
* the whole `IOApp` program.
*/
trait RxnAppMixin extends BaseMixin { this: IOApp =>

private[this] final val _asyncReactiveForIO: AsyncReactive[IO] =
Expand Down
12 changes: 12 additions & 0 deletions ce/shared/src/main/scala/dev/tauri/choam/ce/TxnAppMixin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ import cats.effect.{ IO, IOApp }

import stm.Transactive

/**
* Mixin for convenient access to a `Transactive[IO]`
*
* This trait is intended to be mixed into
* an object extending [[cats.effect.IOApp]].
* It provides an implicit `Transactive[IO]`.
* instance. The resources needed by this
* instance are acquired in the constructor, and
* are never released. Thus, use only if the
* `Transactive` is needed for the duration of
* the whole `IOApp` program.
*/
trait TxnAppMixin extends BaseMixin { this: IOApp =>

private[this] final val _transactiveForIO: Transactive[IO] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ private sealed trait ExchangerImplJvm[A, B]
throw ex
}
val ok = mergedDesc ne null
// TODO: mergedDesc can be null also if we couldn't extend!!!
if (ok) debugLog(s"merged logs - thread#${Thread.currentThread().getId()}")
else debugLog(s"ERROR: Couldn't merge logs - thread#${Thread.currentThread().getId()}")
Predef.assert(ok, s"Couldn't merge logs: ${selfMsg.desc} and ${other.msg.desc}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,8 @@ object RetryStrategy {

private[choam] final object Internal {

final def stepper[F[_]](implicit F: Async[F]): F[Stepper[F]] = {
Stepper[F]
}
final object Stepper {

private final object Stepper {
final def apply[F[_]](implicit F: Async[F]): F[Stepper[F]] = {
F.ref[Deferred[F, Unit]](null).map { state =>
new Stepper[F](state, F)
Expand Down
67 changes: 22 additions & 45 deletions core/shared/src/main/scala/dev/tauri/choam/core/Rxn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ object Rxn extends RxnInstances0 {
private[choam] final def cas[A](r: Ref[A], ov: A, nv: A): Axn[Unit] =
new Cas[A](r.loc, ov, nv)

final def retry[A, B]: Rxn[A, B] =
private[choam] final def retry[A, B]: Rxn[A, B] =
_AlwaysRetry.asInstanceOf[Rxn[A, B]]

private[choam] final def delay[A, B](uf: A => B): Rxn[A, B] =
Expand Down Expand Up @@ -765,18 +765,17 @@ object Rxn extends RxnInstances0 {
}

private final class SuspendUntilChanged(
descs: Array[AbstractDescriptor],
totalSize: Int,
desc: AbstractDescriptor,
) extends SuspendUntil {

final override def toString: String =
s"SuspendUntilChanged(${descs.mkString("[", ", ", "]")}, ${totalSize})"
s"SuspendUntilChanged($desc)"

final override def toF[F[_]](
mcasImpl: Mcas,
mcasCtx: Mcas.ThreadContext,
)(implicit F: Async[F]): F[Rxn[Any, Any]] = {
if (totalSize > 0) {
if ((desc ne null) && (desc.size > 0)) {
F.cont(new Cont[F, Rxn[Any, Any], Rxn[Any, Any]] {
final override def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) =>
G.uncancelable[Rxn[Any, Any]] { poll =>
Expand All @@ -788,7 +787,7 @@ object Rxn extends RxnInstances0 {
}
val refsAndCancelIds = subscribe(mcasImpl, mcasCtx, cb2)
if (refsAndCancelIds eq null) {
// some ref already changed, we're done:
// some ref already changed, don't suspend:
G.pure(null)
} else {
val unsubscribe: F[Unit] = F.delay {
Expand Down Expand Up @@ -822,19 +821,14 @@ object Rxn extends RxnInstances0 {
} else {
mcasImpl.currentContext()
}
val size = this.totalSize
val size = this.desc.size
val refs = new Array[MemoryLocation.WithListeners](size)
val cancelIds = new Array[Long](size)
var idx = 0
val it = this.descs.iterator
while (it.hasNext) {
val d = it.next()
idx = subscribeToDesc(ctx, cb, d, refs, cancelIds, idx)
if (idx == -1) {
return null // scalafix:ok
}
idx = subscribeToDesc(ctx, cb, this.desc, refs, cancelIds, idx)
if (idx == -1) {
return null // scalafix:ok
}

_assert(idx == size)
(refs, cancelIds)
}
Expand All @@ -854,6 +848,7 @@ object Rxn extends RxnInstances0 {
val loc = hwd.address.withListeners
val cancelId = loc.unsafeRegisterListener(ctx, cb, hwd.oldVersion)
if (cancelId == Consts.InvalidListenerId) {
// changed since we've seen it, we won't suspend:
this.undoSubscribe(idx, refs, cancelIds)
return -1 // scalafix:ok
}
Expand All @@ -880,6 +875,7 @@ object Rxn extends RxnInstances0 {
var ok = true
while (idx < len) {
ok &= ((refs(idx) eq null) && (cancelIds(idx) == 0L))
idx += 1
}
ok
})
Expand Down Expand Up @@ -1014,11 +1010,6 @@ object Rxn extends RxnInstances0 {
} else {
null
}
private[this] val discardedDescs: ArrayObjStack[AbstractDescriptor] = if (isStm) {
new ArrayObjStack(initSize = 16)
} else {
null
}

private[this] val contT: ByteStack = new ByteStack(initSize = 8)
private[this] var contK: ObjStack[Any] = mkInitialContK()
Expand Down Expand Up @@ -1214,6 +1205,7 @@ object Rxn extends RxnInstances0 {
}

private[this] final def discardStmAlt(): Unit = {
_assert(this.isStm)
this.stmAlts.popAndDiscard(6)
}

Expand All @@ -1227,12 +1219,16 @@ object Rxn extends RxnInstances0 {
}
}

private[this] final def saveDescForStm(): Unit = {
private[this] final def maybeMergeDescForStm(newDesc: Descriptor): Descriptor = {
if (this.isStm) {
val discarded = _desc
if ((discarded ne null) && discarded.nonEmpty) {
this.discardedDescs.push(discarded)
Descriptor.mergeReadsInto(newDesc, discarded)
} else {
newDesc
}
} else {
newDesc
}
}

Expand All @@ -1241,17 +1237,16 @@ object Rxn extends RxnInstances0 {
contKList.loadSnapshot(alts.pop().asInstanceOf[ListObjStack.Lst[Any]])
contT.loadSnapshot(alts.pop().asInstanceOf[Array[Byte]])
a = alts.pop()
this.saveDescForStm()
_desc = alts.pop().asInstanceOf[Descriptor]
_desc = this.maybeMergeDescForStm(alts.pop().asInstanceOf[Descriptor])
}

private[this] final def loadAltFrom(msg: Exchanger.Msg): Rxn[Any, R] = {
pc.loadSnapshot(msg.postCommit)
contKList.loadSnapshot(msg.contK)
contT.loadSnapshot(msg.contT)
a = msg.value
this.saveDescForStm() // TODO: write a test for this (exchange + STM)
desc = msg.desc
// TODO: write a test for this (exchange + STM)
desc = this.maybeMergeDescForStm(msg.desc)
next().asInstanceOf[Rxn[Any, R]]
}

Expand Down Expand Up @@ -1441,25 +1436,7 @@ object Rxn extends RxnInstances0 {
}
} else { // STM
_assert(canSuspend && this.isStm)
val discardedDescs = this.discardedDescs
val hasExtraDesc = (desc ne null) && desc.nonEmpty
val len = discardedDescs.length + (if (hasExtraDesc) 1 else 0)
val descs: Array[AbstractDescriptor] = new Array[AbstractDescriptor](len)
var totalSize = 0
var idx = 0
if (hasExtraDesc) {
descs(0) = desc
totalSize += desc.size
idx += 1
}
while (idx < len) {
val d = discardedDescs.pop()
descs(idx) = d
totalSize += d.size
idx += 1
}
_assert(discardedDescs.isEmpty())
new SuspendUntilChanged(descs, totalSize)
new SuspendUntilChanged(desc)
}
}

Expand Down
Loading

0 comments on commit 9971791

Please sign in to comment.