Skip to content

Commit

Permalink
Implemented error arguments in error callbacks (#2229)
Browse files Browse the repository at this point in the history
  • Loading branch information
cloutiertyler authored Feb 11, 2025
1 parent 6727e05 commit 03fda01
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 70 deletions.
2 changes: 1 addition & 1 deletion crates/cli/src/subcommands/generate/rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1420,7 +1420,7 @@ impl<Ctx: __sdk::DbContext<
define_event_context(
out,
"ErrorContext",
Some("__sdk::Error"),
Some("Option<__sdk::Error>"),
"[`__sdk::DbConnectionBuilder::on_disconnect`], [`__sdk::DbConnectionBuilder::on_connect_error`] and [`__sdk::SubscriptionBuilder::on_error`] callbacks",
Some("[`__sdk::Error`]"),
);
Expand Down
4 changes: 2 additions & 2 deletions crates/cli/tests/snapshots/codegen__codegen_rust.snap
Original file line number Diff line number Diff line change
Expand Up @@ -2135,12 +2135,12 @@ pub struct ErrorContext {
/// This type is currently unstable and may be removed without a major version bump.
pub set_reducer_flags: SetReducerFlags,
/// The event which caused these callbacks to run.
pub event: __sdk::Error,
pub event: Option<__sdk::Error>,
imp: __sdk::DbContextImpl<RemoteModule>,
}

impl __sdk::AbstractEventContext for ErrorContext {
type Event = __sdk::Error;
type Event = Option<__sdk::Error>;
fn event(&self) -> &Self::Event {
&self.event
}
Expand Down
10 changes: 5 additions & 5 deletions crates/sdk/examples/quickstart-chat/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ fn on_message_sent(ctx: &ReducerEventContext, text: &String) {
// ## Exit when disconnected

/// Our `on_disconnect` callback: print a note, then exit the process.
fn on_disconnected(ctx: &ErrorContext) {
match &ctx.event {
Error::Disconnected => {
fn on_disconnected(_ctx: &ErrorContext, error: Option<Error>) {
match error {
None => {
println!("Disconnected normally.");
std::process::exit(0)
}
err => panic!("Disconnected abnormally: {err}"),
Some(err) => panic!("Disconnected abnormally: {err}"),
}
}

Expand All @@ -161,7 +161,7 @@ const DB_NAME: &str = "quickstart-chat";
fn connect_to_db() -> DbConnection {
DbConnection::builder()
.on_connect(on_connected)
.on_connect_error(|ctx| panic!("Error while connecting: {}", ctx.event))
.on_connect_error(|_ctx, error| panic!("Error while connecting: {}", error))
.on_disconnect(on_disconnected)
.with_token(creds_store().load().expect("Error loading credentials"))
.with_module_name(DB_NAME)
Expand Down
4 changes: 2 additions & 2 deletions crates/sdk/examples/quickstart-chat/module_bindings/mod.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 23 additions & 10 deletions crates/sdk/src/db_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
let res = match msg {
// Error: treat this as an erroneous disconnect.
ParsedMessage::Error(e) => {
let disconnect_ctx = self.make_event_ctx(e.clone());
let disconnect_ctx = self.make_event_ctx(Some(e.clone()));
self.invoke_disconnected(&disconnect_ctx);
Err(e)
}
Expand Down Expand Up @@ -233,7 +233,7 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
}
ParsedMessage::SubscriptionError { query_id, error } => {
let error = crate::Error::SubscriptionError { error };
let ctx = self.make_event_ctx(error);
let ctx = self.make_event_ctx(Some(error));
let Some(query_id) = query_id else {
// A subscription error that isn't specific to a query is a fatal error.
self.invoke_disconnected(&ctx);
Expand All @@ -260,7 +260,7 @@ impl<M: SpacetimeModule> DbContextImpl<M> {

// Grap the `on_disconnect` callback and invoke it.
if let Some(disconnect_callback) = inner.on_disconnect.take() {
disconnect_callback(ctx);
disconnect_callback(ctx, ctx.event().clone());
}

// Call the `on_disconnect` method for all subscriptions.
Expand Down Expand Up @@ -481,9 +481,18 @@ impl<M: SpacetimeModule> DbContextImpl<M> {

// Deranged behavior: mpsc's `try_next` returns `Ok(None)` when the channel is closed,
// and `Err(_)` when the channel is open and waiting. This seems exactly backwards.
//
// NOTE(cloutiertyler): A comment on the deranged behavior: the mental
// model is that of an iterator, but for a stream instead. i.e. you pull
// off of an iterator until it returns `None`, which means that the
// iterator is exhausted. If you try to pull off the iterator and
// there's nothing there but it's not exhausted, it (arguably sensibly)
// returns `Err(_)`. Similar behavior as `Iterator::next` and
// `Stream::poll_next`. No comment on whether this is a good mental
// model or not.
let res = match self.recv.blocking_lock().try_next() {
Ok(None) => {
let disconnect_ctx = self.make_event_ctx(crate::Error::Disconnected);
let disconnect_ctx = self.make_event_ctx(None);
self.invoke_disconnected(&disconnect_ctx);
Err(crate::Error::Disconnected)
}
Expand Down Expand Up @@ -529,7 +538,7 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
match self.runtime.block_on(self.get_message()) {
Message::Local(pending) => self.apply_mutation(pending),
Message::Ws(None) => {
let disconnect_ctx = self.make_event_ctx(crate::Error::Disconnected);
let disconnect_ctx = self.make_event_ctx(None);
self.invoke_disconnected(&disconnect_ctx);
Err(crate::Error::Disconnected)
}
Expand All @@ -544,7 +553,7 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
match self.get_message().await {
Message::Local(pending) => self.apply_mutation(pending),
Message::Ws(None) => {
let disconnect_ctx = self.make_event_ctx(crate::Error::Disconnected);
let disconnect_ctx = self.make_event_ctx(None);
self.invoke_disconnected(&disconnect_ctx);
Err(crate::Error::Disconnected)
}
Expand Down Expand Up @@ -687,9 +696,10 @@ impl<M: SpacetimeModule> DbContextImpl<M> {

type OnConnectCallback<M> = Box<dyn FnOnce(&<M as SpacetimeModule>::DbConnection, Identity, &str) + Send + 'static>;

type OnConnectErrorCallback<M> = Box<dyn FnOnce(&<M as SpacetimeModule>::ErrorContext) + Send + 'static>;
type OnConnectErrorCallback<M> = Box<dyn FnOnce(&<M as SpacetimeModule>::ErrorContext, crate::Error) + Send + 'static>;

type OnDisconnectCallback<M> = Box<dyn FnOnce(&<M as SpacetimeModule>::ErrorContext) + Send + 'static>;
type OnDisconnectCallback<M> =
Box<dyn FnOnce(&<M as SpacetimeModule>::ErrorContext, Option<crate::Error>) + Send + 'static>;

/// All the stuff in a [`DbContextImpl`] which can safely be locked while invoking callbacks.
pub(crate) struct DbContextImplInner<M: SpacetimeModule> {
Expand Down Expand Up @@ -964,7 +974,7 @@ Instead of registering multiple `on_connect` callbacks, register a single callba
/// Register a callback to run when the connection fails asynchronously,
/// e.g. due to invalid credentials.
// FIXME: currently never called; `on_disconnect` is called instead.
pub fn on_connect_error(mut self, callback: impl FnOnce(&M::ErrorContext) + Send + 'static) -> Self {
pub fn on_connect_error(mut self, callback: impl FnOnce(&M::ErrorContext, crate::Error) + Send + 'static) -> Self {
if self.on_connect_error.is_some() {
panic!(
"DbConnectionBuilder can only register a single `on_connect_error` callback.
Expand All @@ -979,7 +989,10 @@ Instead of registering multiple `on_connect_error` callbacks, register a single

/// Register a callback to run when the connection is closed.
// FIXME: currently also called when the connection fails asynchronously, instead of `on_connect_error`.
pub fn on_disconnect(mut self, callback: impl FnOnce(&M::ErrorContext) + Send + 'static) -> Self {
pub fn on_disconnect(
mut self,
callback: impl FnOnce(&M::ErrorContext, Option<crate::Error>) + Send + 'static,
) -> Self {
if self.on_disconnect.is_some() {
panic!(
"DbConnectionBuilder can only register a single `on_disconnect` callback.
Expand Down
2 changes: 1 addition & 1 deletion crates/sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use thiserror::Error;
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum Error {
#[error("Connection has terminated")]
#[error("Connection is already disconnected or has terminated normally")]
Disconnected,

#[error("Failed to connect: {source}")]
Expand Down
2 changes: 1 addition & 1 deletion crates/sdk/src/spacetime_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ where
}

/// [`AbstractEventContext`] subtrait for subscription and connection error callbacks.
pub trait ErrorContext: AbstractEventContext<Event = crate::Error>
pub trait ErrorContext: AbstractEventContext<Event = Option<crate::Error>>
where
Self::Module: SpacetimeModule<ErrorContext = Self>,
{
Expand Down
33 changes: 18 additions & 15 deletions crates/sdk/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
//! This module is internal, and may incompatibly change without warning.
use crate::spacetime_module::AbstractEventContext;
use crate::{
db_connection::{next_request_id, next_subscription_id, DbContextImpl, PendingMutation},
spacetime_module::{SpacetimeModule, SubscriptionHandle},
Expand Down Expand Up @@ -31,7 +32,8 @@ impl<M: SpacetimeModule> Default for SubscriptionManager<M> {

pub(crate) type OnAppliedCallback<M> =
Box<dyn FnOnce(&<M as SpacetimeModule>::SubscriptionEventContext) + Send + 'static>;
pub(crate) type OnErrorCallback<M> = Box<dyn FnOnce(&<M as SpacetimeModule>::ErrorContext) + Send + 'static>;
pub(crate) type OnErrorCallback<M> =
Box<dyn FnOnce(&<M as SpacetimeModule>::ErrorContext, crate::Error) + Send + 'static>;
pub type OnEndedCallback<M> = Box<dyn FnOnce(&<M as SpacetimeModule>::SubscriptionEventContext) + Send + 'static>;

/// When handling a pending unsubscribe, there are three cases the caller must handle.
Expand All @@ -45,23 +47,24 @@ pub(crate) enum PendingUnsubscribeResult<M: SpacetimeModule> {
}

impl<M: SpacetimeModule> SubscriptionManager<M> {
pub(crate) fn on_disconnect(&mut self, ctx: &M::ErrorContext) {
pub(crate) fn on_disconnect(&mut self, _ctx: &M::ErrorContext) {
// We need to clear all the subscriptions.
// We should run the on_error callbacks for all of them.
// TODO: is this correct? We don't remove them from the client cache,
// we may want to resume them in the future if we impl reconnecting,
// and users can already register on-disconnect callbacks which will run in this case.

for (_, mut sub) in self.new_subscriptions.drain() {
if let Some(callback) = sub.on_error() {
callback(ctx);
}
}
for (_, mut s) in self.legacy_subscriptions.drain() {
if let Some(callback) = s.on_error.take() {
callback(ctx);
}
}
// NOTE(cloutiertyler)
// This function previously invoke `on_error` for all subscriptions.
// However, this is inconsistent behavior given that `on_disconnect` for
// connections no longer always has an error argument and that the user
// can add an `on_ended` callback when unsubscribing.
//
// We propose instead that `on_ended` be added to the subscription
// builder so that it can be invoked when the subscription is ended
// because of a normal disconnect, but without the user calling
// `unsubscribe_then`. This can be done in a non-breaking way.
//
// For now, we will just do nothing when a subscription ends normally.
}

/// Register a new subscription. This does not send the subscription to the server.
Expand Down Expand Up @@ -160,7 +163,7 @@ impl<M: SpacetimeModule> SubscriptionManager<M> {
return;
};
if let Some(callback) = sub.on_error() {
callback(ctx)
callback(ctx, ctx.event().clone().unwrap());
}
}
}
Expand Down Expand Up @@ -207,7 +210,7 @@ impl<M: SpacetimeModule> SubscriptionBuilder<M> {
/// or later during the subscription's lifetime if the module's interface changes,
/// in which case [`Self::on_applied`] may have already run.
// Currently unused. Hooking this up requires the new subscription interface and WS protocol.
pub fn on_error(mut self, callback: impl FnOnce(&M::ErrorContext) + Send + 'static) -> Self {
pub fn on_error(mut self, callback: impl FnOnce(&M::ErrorContext, crate::Error) + Send + 'static) -> Self {
self.on_error = Some(Box::new(callback));
self
}
Expand Down
24 changes: 10 additions & 14 deletions crates/sdk/tests/connect_disconnect_client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod module_bindings;

use module_bindings::*;

use spacetimedb_sdk::{DbContext, Error, Table};
use spacetimedb_sdk::{DbContext, Table};

use test_counter::TestCounter;

Expand All @@ -15,7 +15,6 @@ fn db_name_or_panic() -> String {
fn main() {
let disconnect_test_counter = TestCounter::new();
let disconnect_result = disconnect_test_counter.add_test("disconnect");
let on_error_result = disconnect_test_counter.add_test("on_error");

let connect_test_counter = TestCounter::new();
let connected_result = connect_test_counter.add_test("on_connect");
Expand All @@ -24,15 +23,12 @@ fn main() {
let connection = DbConnection::builder()
.with_module_name(db_name_or_panic())
.with_uri(LOCALHOST)
.on_connect_error(|ctx| panic!("on_connect_error: {:?}", ctx.event))
.on_connect_error(|_ctx, error| panic!("on_connect_error: {:?}", error))
.on_connect(move |ctx, _, _| {
connected_result(Ok(()));
ctx.subscription_builder()
.on_error(|ctx| {
if !matches!(ctx.event, Error::Disconnected) {
panic!("Subscription failed: {:?}", ctx.event)
}
on_error_result(Ok(()));
.on_error(|_ctx, error| {
panic!("Subscription failed: {:?}", error);
})
.on_applied(move |ctx| {
let check = || {
Expand All @@ -48,14 +44,14 @@ fn main() {
})
.subscribe("SELECT * FROM connected");
})
.on_disconnect(move |ctx| {
.on_disconnect(move |ctx, error| {
assert!(
!ctx.is_active(),
"on_disconnect callback, but `ctx.is_active()` is true"
);
match &ctx.event {
Error::Disconnected => disconnect_result(Ok(())),
err => disconnect_result(Err(anyhow::anyhow!("{err:?}"))),
match error {
Some(err) => disconnect_result(Err(anyhow::anyhow!("{err:?}"))),
None => disconnect_result(Ok(())),
}
})
.build()
Expand All @@ -75,7 +71,7 @@ fn main() {
let sub_applied_one_row_result = reconnect_test_counter.add_test("disconnected_row");

let new_connection = DbConnection::builder()
.on_connect_error(|ctx| panic!("on_connect_error: {:?}", ctx.event))
.on_connect_error(|_ctx, error| panic!("on_connect_error: {:?}", error))
.on_connect(move |_ctx, _, _| {
reconnected_result(Ok(()));
})
Expand All @@ -98,7 +94,7 @@ fn main() {
};
sub_applied_one_row_result(check());
})
.on_error(|ctx| panic!("subscription on_error: {:?}", ctx.event))
.on_error(|_ctx, error| panic!("subscription on_error: {:?}", error))
.subscribe("SELECT * FROM disconnected");

new_connection.run_threaded();
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

2 comments on commit 03fda01

@github-actions
Copy link

@github-actions github-actions bot commented on 03fda01 Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Criterion benchmark results

Error when comparing benchmarks: Couldn't find AWS credentials in environment, credentials file, or IAM role.

Caused by:
Couldn't find AWS credentials in environment, credentials file, or IAM role.

@github-actions
Copy link

@github-actions github-actions bot commented on 03fda01 Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callgrind benchmark results Error when comparing benchmarks: Couldn't find AWS credentials in environment, credentials file, or IAM role.

Caused by:
Couldn't find AWS credentials in environment, credentials file, or IAM role.

Please sign in to comment.