Skip to content

Commit

Permalink
API: Replace flatMap method with flatMapSwitch etc. Fixes #110
Browse files Browse the repository at this point in the history
  • Loading branch information
raquo committed Nov 22, 2023
1 parent e3aab7e commit e8701e5
Show file tree
Hide file tree
Showing 16 changed files with 173 additions and 65 deletions.
43 changes: 38 additions & 5 deletions src/main/scala/com/raquo/airstream/core/BaseObservable.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.raquo.airstream.core

import com.raquo.airstream.debug.Debugger
import com.raquo.airstream.flatten.FlattenStrategy
import com.raquo.airstream.flatten.{AllowFlatMap, FlattenStrategy, MergingStrategy, SwitchingStrategy}
import com.raquo.airstream.ownership.{Owner, Subscription}
import com.raquo.ew.JsArray

Expand Down Expand Up @@ -53,11 +53,44 @@ trait BaseObservable[+Self[+_] <: Observable[_], +A] extends Source[A] with Name

def mapToUnit: Self[Unit] = map(_ => ())

/** @param compose Note: guarded against exceptions */
@inline def flatMap[B, Inner[_], Output[+_] <: Observable[_]](compose: A => Inner[B])(
implicit strategy: FlattenStrategy[Self, Inner, Output]
/** #WARNING: DO NOT USE THIS METHOD.
* See https://github.com/raquo/Airstream/#flattening-observables
*/
@inline def flatMap[B, Inner[_], Output[+_] <: Observable[_]](
project: A => Inner[B]
)(
implicit strategy: SwitchingStrategy[Self, Inner, Output],
allowFlatMap: AllowFlatMap
): Output[B] = {
strategy.flatten(map(project))
}

/** Alias to flatMapSwitch(_ => s) */
@inline def flatMapTo[B, Inner[_], Output[+_] <: Observable[_]](s: Inner[B])(
implicit strategy: SwitchingStrategy[Self, Inner, Output]
): Output[B] = {
strategy.flatten(map(_ => s))
}

/** @param project Note: guarded against exceptions */
@inline def flatMapSwitch[B, Inner[_], Output[+_] <: Observable[_]](project: A => Inner[B])(
implicit strategy: SwitchingStrategy[Self, Inner, Output]
): Output[B] = {
strategy.flatten(map(project))
}

/** @param project Note: guarded against exceptions */
@inline def flatMapMerge[B, Inner[_], Output[+_] <: Observable[_]](project: A => Inner[B])(
implicit strategy: MergingStrategy[Self, Inner, Output]
): Output[B] = {
strategy.flatten(map(project))
}

/** @param project Note: guarded against exceptions */
@inline def flatMapCustom[B, Inner[_], Output[+_] <: Observable[_]](project: A => Inner[B])(
strategy: FlattenStrategy[Self, Inner, Output]
): Output[B] = {
strategy.flatten(map(compose))
strategy.flatten(map(project))
}

/** Distinct events (but keep all errors) by == (equals) comparison */
Expand Down
33 changes: 27 additions & 6 deletions src/main/scala/com/raquo/airstream/core/Observable.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.raquo.airstream.core

import com.raquo.airstream.debug.DebuggableObservable
import com.raquo.airstream.flatten.FlattenStrategy
import com.raquo.airstream.flatten.FlattenStrategy._
import com.raquo.airstream.flatten.{AllowFlatten, FlattenStrategy, MergingStrategy, SwitchingStrategy}

// @TODO[Scala3] Put this trait together with BaseObservable in the same file, and make BaseObservable sealed.

Expand All @@ -23,20 +23,41 @@ object Observable extends ObservableLowPriorityImplicits {
) extends AnyVal {

@inline def flatten[Output[+_] <: Observable[_]](
implicit strategy: FlattenStrategy[Outer, Inner, Output]
implicit strategy: SwitchingStrategy[Outer, Inner, Output],
allowFlatMap: AllowFlatten
): Output[A] = {
strategy.flatten(parent)
}

@inline def flattenSwitch[Output[+_] <: Observable[_]](
implicit strategy: SwitchingStrategy[Outer, Inner, Output]
): Output[A] = {
strategy.flatten(parent)
}

@inline def flattenMerge[Output[+_] <: Observable[_]](
implicit strategy: MergingStrategy[Outer, Inner, Output]
): Output[A] = {
strategy.flatten(parent)
}

@inline def flattenCustom[Output[+_] <: Observable[_]](
strategy: FlattenStrategy[Outer, Inner, Output]
): Output[A] = {
strategy.flatten(parent)
}
}

implicit val switchStreamStrategy: FlattenStrategy[Observable, EventStream, EventStream] = SwitchStreamStrategy
implicit val switchStreamStrategy: SwitchingStrategy[Observable, EventStream, EventStream] = SwitchStreamStrategy

implicit val switchSignalStreamStrategy: SwitchingStrategy[EventStream, Signal, EventStream] = SwitchSignalStreamStrategy

implicit val switchSignalStreamStrategy: FlattenStrategy[EventStream, Signal, EventStream] = SwitchSignalStreamStrategy
implicit val switchSignalStrategy: SwitchingStrategy[Signal, Signal, Signal] = SwitchSignalStrategy

implicit val switchSignalStrategy: FlattenStrategy[Signal, Signal, Signal] = SwitchSignalStrategy
implicit val mergeStreamsStrategy: MergingStrategy[Observable, EventStream, EventStream] = ConcurrentStreamStrategy
}

trait ObservableLowPriorityImplicits {

implicit val switchSignalObservableStrategy: FlattenStrategy[Observable, Signal, Observable] = SwitchSignalObservableStrategy
implicit val switchSignalObservableStrategy: SwitchingStrategy[Observable, Signal, Observable] = SwitchSignalObservableStrategy
}
Empty file.
46 changes: 39 additions & 7 deletions src/main/scala/com/raquo/airstream/flatten/FlattenStrategy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,75 @@ package com.raquo.airstream.flatten

import com.raquo.airstream.core.{EventStream, Observable, Signal}

/** [[Observable.MetaObservable.flatten]] needs an instance of this trait to know how exactly to do the flattening. */
trait FlattenStrategy[-Outer[+_] <: Observable[_], -Inner[_], +Output[+_] <: Observable[_]] {
import scala.annotation.implicitNotFound

/** [[Observable.MetaObservable.switchFlatten]] needs an instance of this trait to know how exactly to do the flattening. */
trait FlattenStrategy[
-Outer[+_] <: Observable[_],
-Inner[_],
+Output[+_] <: Observable[_]
] {
/** Must not throw */
def flatten[A](parent: Outer[Inner[A]]): Output[A]
}

/** Flatten strategies with semantics of mirroring the latest emitted observable. */
trait SwitchingStrategy[
-Outer[+_] <: Observable[_],
-Inner[_],
+Output[+_] <: Observable[_]
] extends FlattenStrategy[Outer, Inner, Output]

/** Flatten strategies with semantics of merging all of the emitted observables. */
trait MergingStrategy[
-Outer[+_] <: Observable[_],
-Inner[_],
+Output[+_] <: Observable[_]
] extends FlattenStrategy[Outer, Inner, Output]

@implicitNotFound("\nYou are trying to use Airstream's flatMap operator.\nIt was renamed to flatMapSwitch / flatMapMerge to discourage incorrect usage, especially in for-comprehensions.\nSee https://github.com/raquo/Airstream/#flattening-observables\n\n")
trait AllowFlatMap

@implicitNotFound("\nYou are trying to use Airstream's flatten operator.\nIt was renamed to flattenSwitch / flattenMerge to clarify intent and discourage incorrect usage, similarly to flatMap.\nSee https://github.com/raquo/Airstream/#flattening-observables\n\n")
trait AllowFlatten

object FlattenStrategy {

@deprecated("You are using Airstream's deprecated flatMap operator using FlattenStrategy.flatMapAllowed import. This migration helper will be removed in the next version of Airstream. See https://github.com/raquo/Airstream/#flattening-observables", since = "17.0.0")
implicit lazy val allowFlatMap: AllowFlatMap = new AllowFlatMap {}

@deprecated("You are using Airstream's deprecated flatten operator using FlattenStrategy.flattenAllowed import. This migration helper will be removed in the next version of Airstream. See https://github.com/raquo/Airstream/#flattening-observables", since = "17.0.0")
implicit lazy val allowFlatten: AllowFlatten = new AllowFlatten {}

/** See docs for [[SwitchStream]] */
object SwitchStreamStrategy extends FlattenStrategy[Observable, EventStream, EventStream] {
object SwitchStreamStrategy extends SwitchingStrategy[Observable, EventStream, EventStream] {
override def flatten[A](parent: Observable[EventStream[A]]): EventStream[A] = {
new SwitchStream[EventStream[A], A](parent = parent, makeStream = identity)
}
}

/** See docs for [[ConcurrentStream]] */
object ConcurrentStreamStrategy extends FlattenStrategy[Observable, EventStream, EventStream] {
object ConcurrentStreamStrategy extends MergingStrategy[Observable, EventStream, EventStream] {
override def flatten[A](parent: Observable[EventStream[A]]): EventStream[A] = {
new ConcurrentStream[A](parent = parent)
}
}

/** See docs for [[SwitchSignalStream]] */
object SwitchSignalStreamStrategy extends FlattenStrategy[EventStream, Signal, EventStream] {
object SwitchSignalStreamStrategy extends SwitchingStrategy[EventStream, Signal, EventStream] {
override def flatten[A](parent: EventStream[Signal[A]]): EventStream[A] = {
new SwitchSignalStream(parent)
}
}

/** See docs for [[SwitchSignal]] */
object SwitchSignalStrategy extends FlattenStrategy[Signal, Signal, Signal] {
object SwitchSignalStrategy extends SwitchingStrategy[Signal, Signal, Signal] {
override def flatten[A](parent: Signal[Signal[A]]): Signal[A] = {
new SwitchSignal(parent)
}
}

object SwitchSignalObservableStrategy extends FlattenStrategy[Observable, Signal, Observable] {
object SwitchSignalObservableStrategy extends SwitchingStrategy[Observable, Signal, Observable] {
override def flatten[A](parent: Observable[Signal[A]]): Observable[A] = {
parent.matchStreamOrSignal(
ifStream = SwitchSignalStreamStrategy.flatten,
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/raquo/airstream/web/FetchStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ class FetchBuilder[In, Out](
maybeAbortStream,
shouldAbortOnStop,
emitOnce
).flatMap { promise =>
EventStream.fromJsPromise(promise).flatMap(decodeResponse)
).flatMapSwitch { promise =>
EventStream.fromJsPromise(promise).flatMapSwitch(decodeResponse)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/com/raquo/airstream/core/GlitchSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ class GlitchSpec extends UnitSpec {
val stateVar = Var(State(Nil))

var n = 0
val actions: EventStream[Action] = clickBus.events.flatMap { _ =>
val actions: EventStream[Action] = clickBus.events.flatMapSwitch { _ =>
n += 2
EventStream.merge(
EventStream.fromValue(n - 2, emitOnce = true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ class PullResetSignalSpec extends UnitSpec {
case _ => smallSignal
}
.setDisplayName("MetaSignal")
.flatten
.flattenSwitch
.setDisplayName("FlatSignal")
.map(Calculation.log("flat", calculations))
.setDisplayName("FlatSignal--LOG")
Expand Down Expand Up @@ -1136,7 +1136,7 @@ class PullResetSignalSpec extends UnitSpec {
EventStream.fromSeq("big-1" :: "big-2" :: Nil, emitOnce = true).setDisplayName("BigSeqStream")
).setDisplayName("BigMergeStream").startWith("big-0").setDisplayName("BigSignal")

val flatSignal = outerBus.events.startWith(0).setDisplayName("OuterBus.startWith").flatMap {
val flatSignal = outerBus.events.startWith(0).setDisplayName("OuterBus.startWith").flatMapSwitch {
case i if i >= 10 => bigSignal
case _ => smallSignal
}.setDisplayName("FlatSignal").map(Calculation.log("flat", calculations)).setDisplayName("FlatSignal--LOG")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class EventStreamErrorSpec extends UnitSpec with BeforeAndAfter {

val bus = new EventBus[Int]

val stream = bus.events.flatMap(EventStream.fromValue(_, emitOnce = true))
val stream = bus.events.flatMapSwitch(EventStream.fromValue(_, emitOnce = true))

val effects = mutable.Buffer[Effect[_]]()

Expand Down Expand Up @@ -387,7 +387,7 @@ class EventStreamErrorSpec extends UnitSpec with BeforeAndAfter {

val myVar = Var.fromTry[Int](Failure(err))

val stream = myVar.signal.flatMap(EventStream.fromValue(_, emitOnce = true))
val stream = myVar.signal.flatMapSwitch(EventStream.fromValue(_, emitOnce = true))

val effects = mutable.Buffer[Effect[_]]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ class SignalErrorSpec extends UnitSpec with BeforeAndAfter {

val myVar = Var(0)

val stream = myVar.signal.flatMap(Val(_))
val stream = myVar.signal.flatMapSwitch(Val(_))

val effects = mutable.Buffer[Effect[_]]()

Expand Down Expand Up @@ -400,7 +400,7 @@ class SignalErrorSpec extends UnitSpec with BeforeAndAfter {
val myVar = Var.fromTry[Int](Failure(err))

// @TODO[Airstream] Add Signal.fromValue / fromTry that creates a Val
val stream = myVar.signal.flatMap(Val(_))
val stream = myVar.signal.flatMapSwitch(Val(_))

val effects = mutable.Buffer[Effect[_]]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class EventStreamFlattenFutureSpec extends AsyncUnitSpec {
val promise5 = makePromise()

val futureBus = new EventBus[Future[Int]]()
val stream = futureBus.events.flatMap(EventStream.fromFuture(_))
val stream = futureBus.events.flatMapSwitch(EventStream.fromFuture(_))

stream.addObserver(obs)

Expand Down
Loading

0 comments on commit e8701e5

Please sign in to comment.