Skip to content

Commit

Permalink
rust sdk: simplify process_message
Browse files Browse the repository at this point in the history
  • Loading branch information
Centril committed Feb 20, 2025
1 parent 7e749b0 commit c359d67
Showing 1 changed file with 40 additions and 53 deletions.
93 changes: 40 additions & 53 deletions crates/sdk/src/db_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,40 +144,25 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
// then invoke the on-applied and row callbacks.
// We only use this for `subscribe_from_all_tables`
ParsedMessage::InitialSubscription { db_update, sub_id } => {
// Lock the client cache in a restricted scope,
// so that it will be unlocked when callbacks run.
let applied_diff = {
let mut cache = self.cache.lock().unwrap();
db_update.apply_to_client_cache(&mut *cache)
};
let mut inner = self.inner.lock().unwrap();

let sub_event_ctx = self.make_event_ctx(());
inner.subscriptions.legacy_subscription_applied(&sub_event_ctx, sub_id);

let row_event_ctx = self.make_event_ctx(Event::SubscribeApplied);
applied_diff.invoke_row_callbacks(&row_event_ctx, &mut inner.db_callbacks);
self.apply_update(db_update, |inner| {
let sub_event_ctx = self.make_event_ctx(());
inner.subscriptions.legacy_subscription_applied(&sub_event_ctx, sub_id);
Event::SubscribeApplied
});
Ok(())
}

// Successful transaction update:
// apply the received diff to the client cache,
// then invoke on-reducer and row callbacks.
ParsedMessage::TransactionUpdate(event, Some(update)) => {
// Lock the client cache in a restricted scope,
// so that it will be unlocked when callbacks run.
let applied_diff = {
let mut cache = self.cache.lock().unwrap();
update.apply_to_client_cache(&mut *cache)
};
let mut inner = self.inner.lock().unwrap();
if let Event::Reducer(reducer_event) = &event {
let reducer_event_ctx = self.make_event_ctx(reducer_event.clone());
inner.reducer_callbacks.invoke_on_reducer(&reducer_event_ctx);
}

let row_event_ctx = self.make_event_ctx(event);
applied_diff.invoke_row_callbacks(&row_event_ctx, &mut inner.db_callbacks);
self.apply_update(update, |inner| {
if let Event::Reducer(reducer_event) = &event {
let reducer_event_ctx = self.make_event_ctx(reducer_event.clone());
inner.reducer_callbacks.invoke_on_reducer(&reducer_event_ctx);
}
event
});
Ok(())
}

Expand All @@ -195,38 +180,22 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
query_id,
initial_update,
} => {
// Lock the client cache in a restricted scope,
// so that it will be unlocked when callbacks run.
let applied_diff = {
let mut cache = self.cache.lock().unwrap();
initial_update.apply_to_client_cache(&mut *cache)
};
let mut inner = self.inner.lock().unwrap();

let sub_event_ctx = self.make_event_ctx(());
inner.subscriptions.subscription_applied(&sub_event_ctx, query_id);

let row_event_ctx = self.make_event_ctx(Event::SubscribeApplied);
applied_diff.invoke_row_callbacks(&row_event_ctx, &mut inner.db_callbacks);
self.apply_update(initial_update, |inner| {
let sub_event_ctx = self.make_event_ctx(());
inner.subscriptions.subscription_applied(&sub_event_ctx, query_id);
Event::SubscribeApplied
});
Ok(())
}
ParsedMessage::UnsubscribeApplied {
query_id,
initial_update,
} => {
// Lock the client cache in a restricted scope,
// so that it will be unlocked when callbacks run.
let applied_diff = {
let mut cache = self.cache.lock().unwrap();
initial_update.apply_to_client_cache(&mut *cache)
};
let mut inner = self.inner.lock().unwrap();

let sub_event_ctx = self.make_event_ctx(());
inner.subscriptions.unsubscribe_applied(&sub_event_ctx, query_id);

let row_event_ctx = self.make_event_ctx(Event::UnsubscribeApplied);
applied_diff.invoke_row_callbacks(&row_event_ctx, &mut inner.db_callbacks);
self.apply_update(initial_update, |inner| {
let sub_event_ctx = self.make_event_ctx(());
inner.subscriptions.unsubscribe_applied(&sub_event_ctx, query_id);
Event::UnsubscribeApplied
});
Ok(())
}
ParsedMessage::SubscriptionError { query_id, error } => {
Expand All @@ -246,6 +215,24 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
res
}

fn apply_update(
&self,
update: M::DbUpdate,
get_event: impl FnOnce(&mut DbContextImplInner<M>) -> Event<M::Reducer>,
) {
// Lock the client cache in a restricted scope,
// so that it will be unlocked when callbacks run.
let applied_diff = {
let mut cache = self.cache.lock().unwrap();
update.apply_to_client_cache(&mut *cache)
};
let mut inner = self.inner.lock().unwrap();

let event = get_event(&mut inner);
let row_event_ctx = self.make_event_ctx(event);
applied_diff.invoke_row_callbacks(&row_event_ctx, &mut inner.db_callbacks);
}

/// Invoke the on-disconnect callback, and mark [`Self::is_active`] false.
fn invoke_disconnected(&self, ctx: &M::ErrorContext) {
let mut inner = self.inner.lock().unwrap();
Expand Down

0 comments on commit c359d67

Please sign in to comment.