Skip to content

Commit

Permalink
fix: correct thread-local behavior for Effects (closes #2754)
Browse files Browse the repository at this point in the history
  • Loading branch information
gbj committed Aug 1, 2024
1 parent a497d05 commit b05581c
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 125 deletions.
267 changes: 146 additions & 121 deletions reactive_graph/src/effect/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
WithObserver,
},
owner::{LocalStorage, Owner, StoredValue},
owner::{LocalStorage, Owner, Storage, StoredValue, SyncStorage},
traits::Dispose,
};
use any_spawner::Executor;
Expand Down Expand Up @@ -72,13 +72,17 @@ use std::{
/// this with a web framework, this generally means that effects **do not run on the server**.
/// and you can call browser-specific APIs within the effect function without causing issues.
/// If you need an effect to run on the server, use [`Effect::new_isomorphic`].
pub struct Effect {
inner: StoredValue<Option<Arc<RwLock<EffectInner>>>, LocalStorage>,
pub struct Effect<S> {
inner: Option<StoredValue<StoredEffect, S>>,
}

impl Dispose for Effect {
type StoredEffect = Option<Arc<RwLock<EffectInner>>>;

impl<S> Dispose for Effect<S> {
fn dispose(self) {
self.inner.dispose()
if let Some(inner) = self.inner {
inner.dispose()
}
}
}

Expand All @@ -100,12 +104,22 @@ fn effect_base() -> (Receiver, Owner, Arc<RwLock<EffectInner>>) {
(rx, owner, inner)
}

impl Effect {
impl<S> Effect<S>
where
S: Storage<StoredEffect>,
{
/// Stops this effect before it is disposed.
pub fn stop(self) {
drop(self.inner.try_update_value(|inner| inner.take()));
if let Some(inner) = self
.inner
.and_then(|this| this.try_update_value(|inner| inner.take()))
{
drop(inner);
}
}
}

impl Effect<LocalStorage> {
/// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
/// that are read inside it change.
///
Expand All @@ -116,11 +130,11 @@ impl Effect {
where
T: 'static,
{
let (mut rx, owner, inner) = effect_base();
let value = Arc::new(RwLock::new(None::<T>));
let mut first_run = true;
let inner = cfg!(feature = "effects").then(|| {
let (mut rx, owner, inner) = effect_base();
let value = Arc::new(RwLock::new(None::<T>));
let mut first_run = true;

if cfg!(feature = "effects") {
Executor::spawn_local({
let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber();
Expand All @@ -145,100 +159,11 @@ impl Effect {
}
}
});
}

Self {
inner: StoredValue::new_with_storage(Some(inner)),
}
}

/// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
/// that are read inside it change.
///
/// This spawns a task that can be run on any thread. For an effect that will be spawned on
/// the current thread, use [`new`](Effect::new).
pub fn new_sync<T>(
mut fun: impl FnMut(Option<T>) -> T + Send + Sync + 'static,
) -> Self
where
T: Send + Sync + 'static,
{
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let value = Arc::new(RwLock::new(None::<T>));

if cfg!(feature = "effects") {
Executor::spawn({
let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber();

async move {
while rx.next().await.is_some() {
if first_run
|| subscriber.with_observer(|| {
subscriber.update_if_necessary()
})
{
first_run = false;
subscriber.clear_sources(&subscriber);

let old_value =
mem::take(&mut *value.write().or_poisoned());
let new_value = owner.with_cleanup(|| {
subscriber.with_observer(|| fun(old_value))
});
*value.write().or_poisoned() = Some(new_value);
}
}
}
});
}

Self {
inner: StoredValue::new_with_storage(Some(inner)),
}
}

/// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
/// that are read inside it change.
///
/// This will run whether the `effects` feature is enabled or not.
pub fn new_isomorphic<T>(
mut fun: impl FnMut(Option<T>) -> T + Send + Sync + 'static,
) -> Self
where
T: Send + Sync + 'static,
{
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let value = Arc::new(RwLock::new(None::<T>));

Executor::spawn({
let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber();

async move {
while rx.next().await.is_some() {
if first_run
|| subscriber
.with_observer(|| subscriber.update_if_necessary())
{
first_run = false;
subscriber.clear_sources(&subscriber);

let old_value =
mem::take(&mut *value.write().or_poisoned());
let new_value = owner.with_cleanup(|| {
subscriber.with_observer(|| fun(old_value))
});
*value.write().or_poisoned() = Some(new_value);
}
}
}
StoredValue::new_with_storage(Some(inner))
});
Self {
inner: StoredValue::new_with_storage(Some(inner)),
}

Self { inner }
}

/// A version of [`Effect::new`] that only listens to any dependency
Expand Down Expand Up @@ -340,12 +265,12 @@ impl Effect {
D: 'static,
T: 'static,
{
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let dep_value = Arc::new(RwLock::new(None::<D>));
let watch_value = Arc::new(RwLock::new(None::<T>));
let inner = cfg!(feature = "effects").then(|| {
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let dep_value = Arc::new(RwLock::new(None::<D>));
let watch_value = Arc::new(RwLock::new(None::<T>));

if cfg!(feature = "effects") {
Executor::spawn_local({
let dep_value = Arc::clone(&dep_value);
let watch_value = Arc::clone(&watch_value);
Expand Down Expand Up @@ -390,10 +315,102 @@ impl Effect {
}
}
});
}

StoredValue::new_with_storage(Some(inner))
});

Self { inner }
}
}

impl Effect<SyncStorage> {
/// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
/// that are read inside it change.
///
/// This spawns a task that can be run on any thread. For an effect that will be spawned on
/// the current thread, use [`new`](Effect::new).
pub fn new_sync<T>(
mut fun: impl FnMut(Option<T>) -> T + Send + Sync + 'static,
) -> Self
where
T: Send + Sync + 'static,
{
let inner = cfg!(feature = "effects").then(|| {
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let value = Arc::new(RwLock::new(None::<T>));

Executor::spawn({
let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber();

async move {
while rx.next().await.is_some() {
if first_run
|| subscriber.with_observer(|| {
subscriber.update_if_necessary()
})
{
first_run = false;
subscriber.clear_sources(&subscriber);

let old_value =
mem::take(&mut *value.write().or_poisoned());
let new_value = owner.with_cleanup(|| {
subscriber.with_observer(|| fun(old_value))
});
*value.write().or_poisoned() = Some(new_value);
}
}
}
});

StoredValue::new_with_storage(Some(inner))
});

Self { inner }
}

/// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values
/// that are read inside it change.
///
/// This will run whether the `effects` feature is enabled or not.
pub fn new_isomorphic<T>(
mut fun: impl FnMut(Option<T>) -> T + Send + Sync + 'static,
) -> Self
where
T: Send + Sync + 'static,
{
let (mut rx, owner, inner) = effect_base();
let mut first_run = true;
let value = Arc::new(RwLock::new(None::<T>));

Executor::spawn({
let value = Arc::clone(&value);
let subscriber = inner.to_any_subscriber();

async move {
while rx.next().await.is_some() {
if first_run
|| subscriber
.with_observer(|| subscriber.update_if_necessary())
{
first_run = false;
subscriber.clear_sources(&subscriber);

let old_value =
mem::take(&mut *value.write().or_poisoned());
let new_value = owner.with_cleanup(|| {
subscriber.with_observer(|| fun(old_value))
});
*value.write().or_poisoned() = Some(new_value);
}
}
}
});

Self {
inner: StoredValue::new_with_storage(Some(inner)),
inner: Some(StoredValue::new_with_storage(Some(inner))),
}
}

Expand All @@ -415,7 +432,7 @@ impl Effect {
let dep_value = Arc::new(RwLock::new(None::<D>));
let watch_value = Arc::new(RwLock::new(None::<T>));

if cfg!(feature = "effects") {
let inner = cfg!(feature = "effects").then(|| {
Executor::spawn({
let dep_value = Arc::clone(&dep_value);
let watch_value = Arc::clone(&watch_value);
Expand Down Expand Up @@ -460,22 +477,28 @@ impl Effect {
}
}
});
}

Self {
inner: StoredValue::new_with_storage(Some(inner)),
}
StoredValue::new_with_storage(Some(inner))
});

Self { inner }
}
}

impl ToAnySubscriber for Effect {
impl<S> ToAnySubscriber for Effect<S>
where
S: Storage<StoredEffect>,
{
fn to_any_subscriber(&self) -> AnySubscriber {
self.inner
.try_with_value(|inner| {
inner.as_ref().map(|inner| inner.to_any_subscriber())
.and_then(|inner| {
inner
.try_with_value(|inner| {
inner.as_ref().map(|inner| inner.to_any_subscriber())
})
.flatten()
})
.flatten()
.expect("tried to subscribe to effect that has been stopped")
.expect("tried to set effect that has been stopped")
}
}

Expand All @@ -484,7 +507,9 @@ impl ToAnySubscriber for Effect {
#[track_caller]
#[deprecated = "This function is being removed to conform to Rust \
idioms.Please use `Effect::new()` instead."]
pub fn create_effect<T>(fun: impl FnMut(Option<T>) -> T + 'static) -> Effect
pub fn create_effect<T>(
fun: impl FnMut(Option<T>) -> T + 'static,
) -> Effect<LocalStorage>
where
T: 'static,
{
Expand Down
9 changes: 5 additions & 4 deletions reactive_graph/src/effect/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ use crate::{
use or_poisoned::OrPoisoned;
use std::sync::{Arc, RwLock, Weak};

/// Handles internal subscription logic for effects.
#[derive(Debug)]
pub(crate) struct EffectInner {
pub dirty: bool,
pub observer: Sender,
pub sources: SourceSet,
pub struct EffectInner {
pub(crate) dirty: bool,
pub(crate) observer: Sender,
pub(crate) sources: SourceSet,
}

impl ToAnySubscriber for Arc<RwLock<EffectInner>> {
Expand Down

0 comments on commit b05581c

Please sign in to comment.