diff --git a/README.md b/README.md index e11543bc..4e53f7ce 100644 --- a/README.md +++ b/README.md @@ -609,7 +609,7 @@ println(s"After set: ${myVar.now()}") myVar.update(_ + 1) println(s"After update: ${myVar.now()}") -Transaction { +Transaction { _ => println(s"After trx: ${myVar.now()}") } diff --git a/src/main/scala/com/raquo/airstream/combine/MergeStream.scala b/src/main/scala/com/raquo/airstream/combine/MergeStream.scala index 24388837..c48dcf4f 100644 --- a/src/main/scala/com/raquo/airstream/combine/MergeStream.scala +++ b/src/main/scala/com/raquo/airstream/combine/MergeStream.scala @@ -55,8 +55,8 @@ class MergeStream[A]( if (lastFiredInTrx.contains(transaction)) { //println("- syncFire in new trx") nextValue.fold( - nextError => new Transaction(fireError(nextError, _)), - nextEvent => new Transaction(fireValue(nextEvent, _)) + nextError => Transaction(fireError(nextError, _)), + nextEvent => Transaction(fireValue(nextEvent, _)) ) } else { lastFiredInTrx = transaction diff --git a/src/main/scala/com/raquo/airstream/core/Transaction.scala b/src/main/scala/com/raquo/airstream/core/Transaction.scala index f47475a8..74714ba1 100644 --- a/src/main/scala/com/raquo/airstream/core/Transaction.scala +++ b/src/main/scala/com/raquo/airstream/core/Transaction.scala @@ -59,15 +59,14 @@ class Transaction(private[Transaction] var code: Transaction => Any) { object Transaction { - /** Create new transaction (typically used in internal observable code) */ - def apply(code: Transaction => Unit): Unit = new Transaction(code) - - /** Create new transaction (typically used in end user code). + /** Create new transaction. * - * Warning: It is rare that you need to manually create transactions. + * Typically used in internal observable code. + * + * Warning: It is rare that end users need to manually create transactions. * Example of legitimate use case: [[https://github.com/raquo/Airstream/#var-transaction-delay Var transaction delay]] */ - def apply(code: => Unit): Unit = new Transaction(_ => code) + def apply(code: Transaction => Unit): Unit = new Transaction(code) /** This object holds a queue of callbacks that should be executed * when all observables finish starting. This lets `signal.changes` @@ -158,7 +157,7 @@ object Transaction { //println("- no pending callbacks") if (postStartTransactions.length > 0) { //println(s"> CREATE ALT RESOLVE TRX. Num trx-s = ${postStartTransactions.length}") - Transaction { + Transaction { _ => while (postStartTransactions.length > 0) { pendingTransactions.add(postStartTransactions.shift()) } diff --git a/src/main/scala/com/raquo/airstream/custom/CustomSignalSource.scala b/src/main/scala/com/raquo/airstream/custom/CustomSignalSource.scala index ec7fb0e3..1bae74f3 100644 --- a/src/main/scala/com/raquo/airstream/custom/CustomSignalSource.scala +++ b/src/main/scala/com/raquo/airstream/custom/CustomSignalSource.scala @@ -17,7 +17,7 @@ class CustomSignalSource[A] ( ) extends WritableSignal[A] with CustomSource[A] { override protected[this] val config: Config = makeConfig( - value => new Transaction(fireTry(value, _)), + value => Transaction(fireTry(value, _)), () => tryNow(), () => startIndex, () => isStarted diff --git a/src/main/scala/com/raquo/airstream/custom/CustomSource.scala b/src/main/scala/com/raquo/airstream/custom/CustomSource.scala index caa7deb1..e634db2a 100644 --- a/src/main/scala/com/raquo/airstream/custom/CustomSource.scala +++ b/src/main/scala/com/raquo/airstream/custom/CustomSource.scala @@ -31,7 +31,7 @@ trait CustomSource[A] extends WritableObservable[A] { override protected[this] def onStart(): Unit = { Try(config.onStart()).recover[Unit] { - case err: Throwable => new Transaction(fireError(err, _)) + case err: Throwable => Transaction(fireError(err, _)) } super.onStart() } diff --git a/src/main/scala/com/raquo/airstream/custom/CustomStreamSource.scala b/src/main/scala/com/raquo/airstream/custom/CustomStreamSource.scala index dffe2d08..c5bd4105 100644 --- a/src/main/scala/com/raquo/airstream/custom/CustomStreamSource.scala +++ b/src/main/scala/com/raquo/airstream/custom/CustomStreamSource.scala @@ -12,12 +12,8 @@ class CustomStreamSource[A] ( ) extends WritableStream[A] with CustomSource[A] { override protected[this] val config: Config = makeConfig( - value => { - new Transaction(fireValue(value, _)) - }, - err => { - new Transaction(fireError(err, _)) - }, + value => Transaction(fireValue(value, _)), + err => Transaction(fireError(err, _)), () => startIndex, () => isStarted ) diff --git a/src/main/scala/com/raquo/airstream/eventbus/EventBusStream.scala b/src/main/scala/com/raquo/airstream/eventbus/EventBusStream.scala index 09a630ca..49434144 100644 --- a/src/main/scala/com/raquo/airstream/eventbus/EventBusStream.scala +++ b/src/main/scala/com/raquo/airstream/eventbus/EventBusStream.scala @@ -41,7 +41,7 @@ class EventBusStream[A] private[eventbus] () extends WritableStream[A] with Inte //println(s"> init trx from EventBusStream(${nextValue})") - new Transaction(fireValue(nextValue, _)) + Transaction(fireValue(nextValue, _)) } /** Helper method to support batch emit using `WriteBus.emit` / `WriteBus.emitTry` */ @@ -55,7 +55,7 @@ class EventBusStream[A] private[eventbus] () extends WritableStream[A] with Inte } override protected def onError(nextError: Throwable, transaction: Transaction): Unit = { - new Transaction(fireError(nextError, _)) + Transaction(fireError(nextError, _)) } override protected def onWillStart(): Unit = { diff --git a/src/main/scala/com/raquo/airstream/eventbus/WriteBus.scala b/src/main/scala/com/raquo/airstream/eventbus/WriteBus.scala index 972a91ca..757629c5 100644 --- a/src/main/scala/com/raquo/airstream/eventbus/WriteBus.scala +++ b/src/main/scala/com/raquo/airstream/eventbus/WriteBus.scala @@ -96,7 +96,7 @@ object WriteBus { if (hasDuplicateTupleKeys(values.map(_.tuple))) { throw new Exception("Unable to {EventBus,WriteBus}.emit: the provided list of event buses has duplicates. You can't make an observable emit more than one event per transaction.") } - new Transaction(trx => values.foreach(emitValue(_, trx))) + Transaction(trx => values.foreach(emitValue(_, trx))) } /** Emit events into several WriteBus-es at once (in the same transaction) @@ -107,7 +107,7 @@ object WriteBus { if (hasDuplicateTupleKeys(values.map(_.tuple))) { throw new Exception("Unable to {EventBus,WriteBus}.emitTry: the provided list of event buses has duplicates. You can't make an observable emit more than one event per transaction.") } - new Transaction(trx => values.foreach(emitTryValue(_, trx))) + Transaction(trx => values.foreach(emitTryValue(_, trx))) } @inline private def emitValue[A](tuple: BusTuple[A], transaction: Transaction): Unit = { diff --git a/src/main/scala/com/raquo/airstream/flatten/ConcurrentStream.scala b/src/main/scala/com/raquo/airstream/flatten/ConcurrentStream.scala index 7ad212b8..ba4259c1 100644 --- a/src/main/scala/com/raquo/airstream/flatten/ConcurrentStream.scala +++ b/src/main/scala/com/raquo/airstream/flatten/ConcurrentStream.scala @@ -20,8 +20,8 @@ class ConcurrentStream[A]( private val accumulatedStreams: JsArray[EventStream[A]] = JsArray() private val internalEventObserver: InternalObserver[A] = InternalObserver[A]( - onNext = (nextEvent, _) => new Transaction(fireValue(nextEvent, _)), - onError = (nextError, _) => new Transaction(fireError(nextError, _)) + onNext = (nextEvent, _) => Transaction(fireValue(nextEvent, _)), + onError = (nextError, _) => Transaction(fireError(nextError, _)) ) override protected val topoRank: Int = 1 @@ -51,7 +51,7 @@ class ConcurrentStream[A]( case Failure(err) => // @TODO[API] Not 100% sure that we should emit this error, but since // we expect to use signal's current value, I think this is right. - new Transaction(fireError(err, _)) // #Note[onStart,trx,loop] + Transaction(fireError(err, _)) // #Note[onStart,trx,loop] case _ => () } case _ => () diff --git a/src/main/scala/com/raquo/airstream/flatten/SwitchSignal.scala b/src/main/scala/com/raquo/airstream/flatten/SwitchSignal.scala index be8dbbe1..8f691356 100644 --- a/src/main/scala/com/raquo/airstream/flatten/SwitchSignal.scala +++ b/src/main/scala/com/raquo/airstream/flatten/SwitchSignal.scala @@ -30,7 +30,7 @@ class SwitchSignal[A]( onTry = (nextTry, _) => { //println(s"> init trx from $this SwitchSignal.onValue($nextTry)") innerSignalLastSeenUpdateId = Protected.lastUpdateId(currentSignalTry.get) - new Transaction(fireTry(nextTry, _)) + Transaction(fireTry(nextTry, _)) } ) @@ -104,7 +104,7 @@ class SwitchSignal[A]( // Update this signal's value with nextSignal's current value (or an error if we don't have nextSignal) //println(s"> init trx from SwitchSignal.onTry (new signal)") - new Transaction(trx => { + Transaction { trx => // #Note: Timing is important here. // 1. Create the `trx` transaction, since we need that boundary when flattening @@ -130,7 +130,7 @@ class SwitchSignal[A]( nextSignalTry.foreach(_.addInternalObserver(internalEventObserver, shouldCallMaybeWillStart = false)) } - }) + } } } diff --git a/src/main/scala/com/raquo/airstream/flatten/SwitchSignalStream.scala b/src/main/scala/com/raquo/airstream/flatten/SwitchSignalStream.scala index 5cb2f456..c6fb4e00 100644 --- a/src/main/scala/com/raquo/airstream/flatten/SwitchSignalStream.scala +++ b/src/main/scala/com/raquo/airstream/flatten/SwitchSignalStream.scala @@ -24,14 +24,14 @@ class SwitchSignalStream[A]( private[this] val internalEventObserver: InternalObserver[A] = InternalObserver.fromTry[A]( onTry = (nextTry, _) => { //println(s"> init trx from SwitchSignalStream.onValue($nextTry)") - new Transaction(trx => { + Transaction { trx => if (isStarted) { fireTry(nextTry, trx) maybeCurrentSignalTry.foreach { _.foreach { currentSignal => lastSeenSignalUpdateId = Protected.lastUpdateId(currentSignal) }} } - }) + } } ) @@ -70,7 +70,7 @@ class SwitchSignalStream[A]( lastSeenSignalUpdateId = -1 //println(s"> init trx from SwitchSignalStream.onTry (new signal)") - new Transaction(trx => { + Transaction { trx => if (isStarted) { // #Note: Timing is important here. // 1. Create the `trx` transaction, since we need that boundary when flattening @@ -92,7 +92,7 @@ class SwitchSignalStream[A]( nextSignal.addInternalObserver(internalEventObserver, shouldCallMaybeWillStart = false) } } - }) + } } } @@ -103,12 +103,12 @@ class SwitchSignalStream[A]( val newSignalLastUpdateId = Protected.lastUpdateId(currentSignal) if (newSignalLastUpdateId != lastSeenSignalUpdateId) { //println(s"> init trx from SwitchSignalStream.onTry (same signal)") - new Transaction(trx => { + Transaction { trx => if (isStarted) { fireTry(currentSignal.tryNow(), trx) // #Note[onStart,trx,loop] lastSeenSignalUpdateId = newSignalLastUpdateId } - }) + } } currentSignal.addInternalObserver(internalEventObserver, shouldCallMaybeWillStart = false) }) diff --git a/src/main/scala/com/raquo/airstream/flatten/SwitchStream.scala b/src/main/scala/com/raquo/airstream/flatten/SwitchStream.scala index 12bdeefc..64eb36d1 100644 --- a/src/main/scala/com/raquo/airstream/flatten/SwitchStream.scala +++ b/src/main/scala/com/raquo/airstream/flatten/SwitchStream.scala @@ -43,10 +43,10 @@ class SwitchStream[I, O]( private[this] val internalEventObserver: InternalObserver[O] = InternalObserver[O]( onNext = (nextEvent, _) => { //println(s"> init trx from SwitchEventStream.onValue(${nextEvent})") - new Transaction(fireValue(nextEvent, _)) + Transaction(fireValue(nextEvent, _)) }, onError = (nextError, _) => { - new Transaction(fireError(nextError, _)) + Transaction(fireError(nextError, _)) } ) @@ -113,7 +113,7 @@ class SwitchStream[I, O]( private def switchToNextError(nextError: Throwable, transaction: Option[Transaction]): Unit = { removeInternalObserverFromCurrentEventStream() maybeCurrentEventStreamTry = Failure(nextError) - transaction.fold[Unit](new Transaction(fireError(nextError, _)))(fireError(nextError, _)) // #Note[onStart,trx,loop] + transaction.fold[Unit](Transaction(fireError(nextError, _)))(fireError(nextError, _)) // #Note[onStart,trx,loop] } private def removeInternalObserverFromCurrentEventStream(): Unit = { diff --git a/src/main/scala/com/raquo/airstream/state/Var.scala b/src/main/scala/com/raquo/airstream/state/Var.scala index 3dfb3be6..dbb116d7 100644 --- a/src/main/scala/com/raquo/airstream/state/Var.scala +++ b/src/main/scala/com/raquo/airstream/state/Var.scala @@ -28,7 +28,7 @@ trait Var[A] extends SignalSource[A] with Sink[A] with Named { val writer: Observer[A] = Observer.fromTry { case nextTry => // Note: `case` syntax needed for Scala 2.12 //println(s"> init trx from Var.writer(${nextTry})") - new Transaction(setCurrentValue(nextTry, _)) + Transaction(setCurrentValue(nextTry, _)) } /** Write values into a Var of Option[V] without manually wrapping in Some() */ @@ -47,21 +47,23 @@ trait Var[A] extends SignalSource[A] with Sink[A] with Named { * @param mod (currValue, nextInput) => nextValue */ def updater[B](mod: (A, B) => A): Observer[B] = Observer.fromTry { case nextInputTry => - new Transaction(trx => nextInputTry match { - case Success(nextInput) => - tryNow() match { - case Success(currentValue) => - val nextValue = Try(mod(currentValue, nextInput)) // this does catch exceptions in mod - setCurrentValue(nextValue, trx) - case Failure(err) => - AirstreamError.sendUnhandledError( - VarError("Unable to update a failed Var. Consider Var#tryUpdater instead.", cause = Some(err)) - ) - } - - case Failure(err) => - setCurrentValue(Failure[A](err), trx) - }) + Transaction { trx => + nextInputTry match { + case Success(nextInput) => + tryNow() match { + case Success(currentValue) => + val nextValue = Try(mod(currentValue, nextInput)) // this does catch exceptions in mod + setCurrentValue(nextValue, trx) + case Failure(err) => + AirstreamError.sendUnhandledError( + VarError("Unable to update a failed Var. Consider Var#tryUpdater instead.", cause = Some(err)) + ) + } + + case Failure(err) => + setCurrentValue(Failure[A](err), trx) + } + } } // @TODO[Scala3] When we don't need 2.12, remove 'case' from all PartialFunction instances that don't need it (e.g. Observer.fromTry) @@ -70,13 +72,15 @@ trait Var[A] extends SignalSource[A] with Sink[A] with Named { * Note: Must not throw! */ def tryUpdater[B](mod: (Try[A], B) => Try[A]): Observer[B] = Observer.fromTry { case nextInputTry => - new Transaction(trx => nextInputTry match { - case Success(nextInput) => - val nextValue = mod(getCurrentValue, nextInput) - setCurrentValue(nextValue, trx) - case Failure(err) => - setCurrentValue(Failure[A](err), trx) - }) + Transaction { trx => + nextInputTry match { + case Success(nextInput) => + val nextValue = mod(getCurrentValue, nextInput) + setCurrentValue(nextValue, trx) + case Failure(err) => + setCurrentValue(Failure[A](err), trx) + } + } } def zoom[B](in: A => B)(out: (A, B) => A)(implicit owner: Owner): Var[B] = { @@ -94,7 +98,7 @@ trait Var[A] extends SignalSource[A] with Sink[A] with Named { * @param mod Note: guarded against exceptions */ def update(mod: A => A): Unit = { - new Transaction(trx => { + Transaction { trx => tryNow() match { case Success(currentValue) => val nextValue = Try(mod(currentValue)) // this does catch exceptions in mod(currentValue) @@ -104,17 +108,16 @@ trait Var[A] extends SignalSource[A] with Sink[A] with Named { VarError("Unable to update a failed Var. Consider Var#tryUpdate instead.", cause = Some(err)) ) } - - }) + } } /** @param mod Note: must not throw */ def tryUpdate(mod: Try[A] => Try[A]): Unit = { //println(s"> init trx from Var.tryUpdate") - new Transaction(trx => { + Transaction { trx => val nextValue = mod(getCurrentValue) setCurrentValue(nextValue, trx) - }) + } } @inline def tryNow(): Try[A] = signal.tryNow() @@ -162,12 +165,12 @@ object Var { */ def setTry(values: VarTryTuple[_]*): Unit = { //println(s"> init trx from Var.set/setTry") - new Transaction(trx => { + Transaction { trx => if (hasDuplicateVars(values.map(_.tuple))) { throw VarError("Unable to Var.{set,setTry}: the provided list of vars has duplicates. You can't make an observable emit more than one event per transaction.", cause = None) } values.foreach(setTryValue(_, trx)) - }) + } } /** Modify multiple Vars in the same Transaction @@ -176,18 +179,18 @@ object Var { * Mod functions should be PURE. * - If a mod throws, the var will be set to a failed state. * - If you try to update a failed Var, `Var.update` will post an error to unhandled errors, - * and none of the Vars will update. + * and none of the Vars will update. * * Reports an Airstream unhandled error: - * 1) if currentValue of any of the vars is a Failure. - * This is atomic: an exception in any of the vars will prevent any of - * the batched updates in this call from going through. - * 2) if input contains duplicate vars. - * Airstream allows a maximum of one event per observable per transaction. + * 1) if currentValue of any of the vars is a Failure. + * This is atomic: an exception in any of the vars will prevent any of + * the batched updates in this call from going through. + * 2) if input contains duplicate vars. + * Airstream allows a maximum of one event per observable per transaction. */ def update(mods: VarModTuple[_]*): Unit = { //println(s"> init trx from Var.update") - new Transaction(trx => { + Transaction { trx => if (hasDuplicateVars(mods.map(_.tuple))) { throw VarError("Unable to Var.update: the provided list of vars has duplicates. You can't make an observable emit more than one event per transaction.", cause = None) } @@ -195,12 +198,13 @@ object Var { val vars = mods.map(_.tuple._1) try { vars.foreach(_.now()) - } catch { case err: Throwable => - throw VarError("Unable to Var.update a failed Var. Consider Var.tryUpdate instead.", cause = Some(err)) + } catch { + case err: Throwable => + throw VarError("Unable to Var.update a failed Var. Consider Var.tryUpdate instead.", cause = Some(err)) } val tryValues: Seq[VarTryTuple[_]] = tryMods.map(t => tryModToTryTuple(t)) tryValues.foreach(setTryValue(_, trx)) - }) + } } /** Modify multiple Vars in the same Transaction @@ -213,13 +217,13 @@ object Var { */ def tryUpdate(mods: VarTryModTuple[_]*): Unit = { //println(s"> init trx from Var.tryUpdate") - new Transaction(trx => { + Transaction { trx => if (hasDuplicateVars(mods.map(_.tuple))) { throw VarError("Unable to Var.tryUpdate: the provided list of vars has duplicates. You can't make an observable emit more than one event per transaction.", cause = None) } val tryValues: Seq[VarTryTuple[_]] = mods.map(t => tryModToTryTuple(t)) tryValues.foreach(setTryValue(_, trx)) - }) + } } @inline private def toTryTuple[A](varTuple: VarTuple[A]): VarTryTuple[A] = { diff --git a/src/main/scala/com/raquo/airstream/timing/DebounceStream.scala b/src/main/scala/com/raquo/airstream/timing/DebounceStream.scala index e333542d..399d2d3d 100644 --- a/src/main/scala/com/raquo/airstream/timing/DebounceStream.scala +++ b/src/main/scala/com/raquo/airstream/timing/DebounceStream.scala @@ -35,7 +35,7 @@ class DebounceStream[A]( maybeLastTimeoutHandle.foreach(js.timers.clearTimeout) maybeLastTimeoutHandle = js.timers.setTimeout(intervalMs.toDouble) { //println(s"> init trx from DebounceEventStream.onTry($nextValue)") - new Transaction(fireTry(nextValue, _)) + Transaction(fireTry(nextValue, _)) } } diff --git a/src/main/scala/com/raquo/airstream/timing/DelayStream.scala b/src/main/scala/com/raquo/airstream/timing/DelayStream.scala index 67dbe121..be0605e8 100644 --- a/src/main/scala/com/raquo/airstream/timing/DelayStream.scala +++ b/src/main/scala/com/raquo/airstream/timing/DelayStream.scala @@ -22,7 +22,7 @@ class DelayStream[A]( timerHandle = js.timers.setTimeout(delayMs.toDouble) { //println(s"> init trx from DelayEventStream.onNext($nextValue)") timerHandles.splice(timerHandles.indexOf(timerHandle), deleteCount = 1) // Remove handle - new Transaction(fireValue(nextValue, _)) + Transaction(fireValue(nextValue, _)) () } timerHandles.push(timerHandle) @@ -32,7 +32,7 @@ class DelayStream[A]( var timerHandle: SetTimeoutHandle = null timerHandle = js.timers.setTimeout(delayMs.toDouble) { timerHandles.splice(timerHandles.indexOf(timerHandle), deleteCount = 1) // Remove handle - new Transaction(fireError(nextError, _)) + Transaction(fireError(nextError, _)) () } timerHandles.push(timerHandle) diff --git a/src/main/scala/com/raquo/airstream/timing/JsPromiseSignal.scala b/src/main/scala/com/raquo/airstream/timing/JsPromiseSignal.scala index f1d0f82c..10b00046 100644 --- a/src/main/scala/com/raquo/airstream/timing/JsPromiseSignal.scala +++ b/src/main/scala/com/raquo/airstream/timing/JsPromiseSignal.scala @@ -45,6 +45,6 @@ class JsPromiseSignal[A](promise: js.Promise[A]) extends WritableSignal[Option[A // #Note fireTry sets current value even if the signal has no observers val nextValue = nextPromiseValue.map(Some(_)) //println(s"> init trx from FutureSignal($value)") - new Transaction(fireTry(nextValue, _)) // #Note[onStart,trx,async] + Transaction(fireTry(nextValue, _)) // #Note[onStart,trx,async] } } diff --git a/src/main/scala/com/raquo/airstream/timing/JsPromiseStream.scala b/src/main/scala/com/raquo/airstream/timing/JsPromiseStream.scala index 4c6f2f43..1ee6e5b0 100644 --- a/src/main/scala/com/raquo/airstream/timing/JsPromiseStream.scala +++ b/src/main/scala/com/raquo/airstream/timing/JsPromiseStream.scala @@ -33,7 +33,7 @@ class JsPromiseStream[A](promise: js.Promise[A], emitOnce: Boolean) extends Writ (nextValue: A) => { isPending = false //println(s"> init trx from FutureEventStream.init($nextValue)") - new Transaction(fireValue(nextValue, _)) + Transaction(fireValue(nextValue, _)) (): Unit | js.Thenable[Unit] }, js.defined { (rawException: Any) => { @@ -43,7 +43,7 @@ class JsPromiseStream[A](promise: js.Promise[A], emitOnce: Boolean) extends Writ case _ => js.JavaScriptException(rawException) } //println(s"> init trx from JsPromiseEventStream.init($nextError)") - new Transaction(fireError(nextError, _)) + Transaction(fireError(nextError, _)) (): Unit | js.Thenable[Unit] }} ) diff --git a/src/main/scala/com/raquo/airstream/timing/PeriodicStream.scala b/src/main/scala/com/raquo/airstream/timing/PeriodicStream.scala index f0aef351..5f2bf33c 100644 --- a/src/main/scala/com/raquo/airstream/timing/PeriodicStream.scala +++ b/src/main/scala/com/raquo/airstream/timing/PeriodicStream.scala @@ -43,14 +43,14 @@ class PeriodicStream[A]( } private def tick(): Unit = { - new Transaction(trx => { // #Note[onStart,trx,async] + Transaction { trx => // #Note[onStart,trx,async] if (isStarted) { // This cycle should also be broken by clearTimeout() in onStop, // but just in case of some weird timing I put isStarted check here. fireValue(currentValue, trx) setNext() } - }) + } } private def setNext(): Unit = { @@ -63,7 +63,7 @@ class PeriodicStream[A]( case Success(None) => resetTo(initial, tickNext = false) case Failure(err) => - new Transaction(fireError(err, _)) // #Note[onStart,trx,async] + Transaction(fireError(err, _)) // #Note[onStart,trx,async] } } diff --git a/src/main/scala/com/raquo/airstream/timing/ThrottleStream.scala b/src/main/scala/com/raquo/airstream/timing/ThrottleStream.scala index 33fb2521..d6a476c3 100644 --- a/src/main/scala/com/raquo/airstream/timing/ThrottleStream.scala +++ b/src/main/scala/com/raquo/airstream/timing/ThrottleStream.scala @@ -53,7 +53,7 @@ class ThrottleStream[A]( maybeFirstTimeoutHandle = js.timers.setTimeout(0) { maybeFirstTimeoutHandle = js.undefined //println(s"> init trx from leading ThrottleEventStream.onTry($nextValue)") - new Transaction(fireTry(nextValue, _)) + Transaction(fireTry(nextValue, _)) } } else { @@ -62,7 +62,7 @@ class ThrottleStream[A]( maybeLastTimeoutHandle = js.timers.setTimeout(remainingMs.toDouble) { lastEmittedEventMs = js.Date.now() // @TODO Should this fire now, or inside the transaction below? //println(s"> init trx from ThrottleEventStream.onTry($nextValue)") - new Transaction(fireTry(nextValue, _)) + Transaction(fireTry(nextValue, _)) } } } diff --git a/src/main/scala/com/raquo/airstream/web/AjaxStream.scala b/src/main/scala/com/raquo/airstream/web/AjaxStream.scala index 06c0965c..a8e19ff9 100644 --- a/src/main/scala/com/raquo/airstream/web/AjaxStream.scala +++ b/src/main/scala/com/raquo/airstream/web/AjaxStream.scala @@ -80,9 +80,9 @@ class AjaxStream( maybePendingRequest = js.undefined val status = request.status if (isStatusCodeSuccess(status)) - new Transaction(fireValue(request, _)) + Transaction(fireValue(request, _)) else - new Transaction(fireError(AjaxStatusError(request, status, s"Ajax request failed: $status ${request.statusText}"), _)) + Transaction(fireError(AjaxStatusError(request, status, s"Ajax request failed: $status ${request.statusText}"), _)) } } @@ -94,21 +94,21 @@ class AjaxStream( // - `ev` is not actually a dom.ErrorEvent, but a useless dom.ProgressEvent // - Reasons could be network, DNS, CORS, etc. - new Transaction(fireError(AjaxNetworkError(request, s"Ajax request failed: unknown network reason."), _)) + Transaction(fireError(AjaxNetworkError(request, s"Ajax request failed: unknown network reason."), _)) } } request.onabort = (_: js.Any) => { if (maybePendingRequest.contains(request)) { maybePendingRequest = js.undefined - new Transaction(fireError(AjaxAbort(request), _)) + Transaction(fireError(AjaxAbort(request), _)) } } request.ontimeout = (_: dom.Event) => { if (maybePendingRequest.contains(request)) { maybePendingRequest = js.undefined - new Transaction(fireError(AjaxTimeout(request), _)) + Transaction(fireError(AjaxTimeout(request), _)) } } diff --git a/src/main/scala/com/raquo/airstream/web/FetchStream.scala b/src/main/scala/com/raquo/airstream/web/FetchStream.scala index 1bfc67ca..3995136c 100644 --- a/src/main/scala/com/raquo/airstream/web/FetchStream.scala +++ b/src/main/scala/com/raquo/airstream/web/FetchStream.scala @@ -143,7 +143,7 @@ class FetchStream private[web] ( maybeAbortStream.map { _ => InternalObserver[Any]( onNext = (_, _) => maybeAbortController.get.abort(), - onError = (err, _) => new Transaction(fireError(err, _)) + onError = (err, _) => Transaction(fireError(err, _)) ) } } @@ -156,7 +156,7 @@ class FetchStream private[web] ( val responsePromise = dom.Fetch.fetch(url, requestInit) // #TODO[Integrity] Is it ok to emit asynchronously here? Maybe we should save `responsePromise` and emit it `onStart`? js.timers.setTimeout(0) { - new Transaction(fireValue(responsePromise, _)) + Transaction(fireValue(responsePromise, _)) } hasEmittedEvents = true } diff --git a/src/test/scala/com/raquo/airstream/core/PullResetSignalSpec.scala b/src/test/scala/com/raquo/airstream/core/PullResetSignalSpec.scala index 5ad56f06..f72d9165 100644 --- a/src/test/scala/com/raquo/airstream/core/PullResetSignalSpec.scala +++ b/src/test/scala/com/raquo/airstream/core/PullResetSignalSpec.scala @@ -416,15 +416,15 @@ class PullResetSignalSpec extends UnitSpec { v.set(11) - // With `new Transaction`, these could be delayed if we're already + // With `Transaction { ... }`, these could be delayed if we're already // inside a transaction (in the test, we aren't), and also we can't // return the value from inside the transaction (due to the potential // delay) - new Transaction(_ => { + Transaction { _ => isPositive.addObserver(Observer.empty)(owner) isEven.addObserver(Observer.empty)(owner) combined.addObserver(Observer.empty)(owner) - }) + } log.toList shouldBe List( "11 isPositive = true", @@ -633,12 +633,12 @@ class PullResetSignalSpec extends UnitSpec { AirstreamError.unregisterUnhandledErrorCallback(AirstreamError.consoleErrorCallback) try { - new Transaction(_ => { + Transaction { _ => isPositive.addObserver(Observer.empty)(owner) isEven.addObserver(Observer.empty)(owner) throw new Exception(sharedBlockErrorMsg) // combined.addObserver(Observer.empty)(owner) - }) + } log.toList shouldBe List( "11 isPositive = true", @@ -653,9 +653,9 @@ class PullResetSignalSpec extends UnitSpec { // -- - new Transaction(_ => { + Transaction { _ => combined.addObserver(Observer.empty)(owner) - }) + } v.set(-12) diff --git a/src/test/scala/com/raquo/airstream/core/TransactionSpec.scala b/src/test/scala/com/raquo/airstream/core/TransactionSpec.scala index fbf5f1ac..14320b5b 100644 --- a/src/test/scala/com/raquo/airstream/core/TransactionSpec.scala +++ b/src/test/scala/com/raquo/airstream/core/TransactionSpec.scala @@ -77,9 +77,9 @@ class TransactionSpec extends UnitSpec { bus.events.foreach { num => if (num % 2 == 0) { - new Transaction(_ => { + Transaction { _ => throw new Exception("Random error in transaction") - }) + } } else { log.update(_ :+ num) } diff --git a/src/test/scala/com/raquo/airstream/misc/SplitSignalSpec.scala b/src/test/scala/com/raquo/airstream/misc/SplitSignalSpec.scala index c56e2cd5..6f1c3af4 100644 --- a/src/test/scala/com/raquo/airstream/misc/SplitSignalSpec.scala +++ b/src/test/scala/com/raquo/airstream/misc/SplitSignalSpec.scala @@ -685,7 +685,7 @@ class SplitSignalSpec extends UnitSpec with BeforeAndAfter { }(owner) ) // #Note: Test that our dropping logic works does not break events scheduled after transaction boundary - new Transaction(_ => { + Transaction { _ => DynamicSubscription.subscribeCallback( innerDynamicOwner, owner => fooSignal.foreach { foo => @@ -693,7 +693,7 @@ class SplitSignalSpec extends UnitSpec with BeforeAndAfter { effects += Effect(s"new-trx-update-child-$key", foo.id + "-" + foo.version.toString) }(owner) ) - }) + } Bar(key) })