Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core implement ws server listener interface for subs and adverts #189

Conversation

eloff
Copy link
Contributor

@eloff eloff commented Feb 5, 2025

Add new callbacks to ServerListener:

fn on_subscribe(&self, _channel_id: ChannelId) {}
fn on_unsubscribe(&self, _channel_id: ChannelId) {}
fn on_client_advertise(&self, _channel: &ClientChannel) {}
fn on_client_unadvertise(&self, _channel_id: ClientChannelId) {}

This mirrors how the callbacks are defined in the Python implementation, and they function in a similar way (e.g. not firing for duplicate or erroneous requests)

I added RecordingServerListener to testutil to allow more easily testing ServerListener.

I modified existing tests to verify these callbacks are called at the appropriate times with the expected arguments, and not called for duplicate requests.

Copy link

linear bot commented Feb 5, 2025

@eloff eloff requested review from gasmith and bryfox February 5, 2025 22:44
Copy link
Contributor

@bryfox bryfox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good to me. As discussed, I think we can probably bring the server listener out from behind the feature now.

There's a client-publish example which uses part of this — it might be good to augment that with the advertisement handling.

let final_state = vec.get().to_vec();

// Old snapshot should still be valid and have original length
assert_eq!(old_snapshot.len(), 3);
// Both threads should see 5 items in their final state
assert_eq!(final_state.len(), 5);
assert_eq!(thread_state.len(), 5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this is no longer desired, but the comment above is now stale.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix the comment, this was a race condition. It would be 4 items if the thread runs before the push(5) after starting the thread.

client_sender
.send(Message::text(subscribe.to_string()))
.await
.expect("Failed to send");

// Allow the server to process the subscription
// FG-9723: replace this with an on_subscribe callback
// (whoops, that won't work either, unless we do something like polling the recording_listener)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe a way to flush the queues?

I think we should file an issue so we can clean up the comment above (not referencing a completed ticket) — whatever approach you think is best.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There doesn't seem to be a way to wait for the queue to be drained, other than polling it, checking is_empty - but that still doesn't mean the data was acted on, just that it was removed from the queue. So I think it's the wrong thing to do.

I'm not sure what a sensible way to do this would look like. It's easy enough to come up with something just for tests where we poll somehow and wait for the messages to be processed.

Is that just for tests, or do we need something more polished that we could expose to the user?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, just for tests. I guess we don't necessarily need to poll the queue — we could have a test helper that polls, with a timeout, whatever future we want to wait on in the test (e.g. client_receiver.next())

Copy link
Contributor

@gasmith gasmith Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A pattern I've used in the past is assert_eventually(conf: impl Fn() -> bool). Imagine we had something like:

let client: Arc<TestClient> = ...;
tokio::spawn(client.clone().receive_forever());

for msg in [/* requests */] {
    client.send(msg).await.unwrap();
}

let expected = vec![ /* responses */];
dbg!(&expected);
assert_eventually(|| {
    expect == dbg!(client.get_received())
}).await;
client.reset_received();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That future is buried at the bottom of Server::handle_connection, which is what makes it tricky. I'll create ticket to track it, but I'd have to experiment and think about it as to how we could implement it in a sensible way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rust/foxglove/src/tests/websocket.rs Outdated Show resolved Hide resolved
/// Callback invoked when a client advertises a client channel. Requires the "clientPublish" capability.
fn on_client_advertise(&self, _channel: &ClientChannel) {}
/// Callback invoked when a client unadvertises a client channel. Requires the "clientPublish" capability.
fn on_client_unadvertise(&self, _channel_id: ClientChannelId) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, but I'm wondering if we can design a trait in a way that if you implement one of the 'clientPublish' functions, it forces you to implement all — to help guide correct usage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that's a nice idea. I don't know exactly how that would look (or if we can do that.) Something to keep in mind.

Copy link
Contributor

@gasmith gasmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few early comments on the trait, still reading through the rest.

pub trait ServerListener: Send + Sync {
/// Callback invoked when a client message is received.
fn on_message_data(&self, channel_id: ClientChannelId, payload: &[u8]);
fn on_message_data(&self, _channel_id: ClientChannelId, _payload: &[u8]) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the ClientChannelId globally unique, or locally-unique to a particular client? I presume the latter? Might IDs be reused, from the perspective of an implementer? We should probably update the rust doc for that type, and add some commentary to these methods to help guide user expectations.

We should add a reference to https://github.com/foxglove/ws-protocol/blob/main/docs/spec.md#client-message-data. Same goes for the other methods, I think.

Copy link
Contributor Author

@eloff eloff Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should only be unique per client, which suggests maybe we should pass in some kind of identifier for the client as well (what? SocketAddr? make up an id?). The Python implementation doesn't address that, and has these same signatures.

The payload is fine, the message only contains the channel id and the payload, and we pass those separately to the callback. I'll double-check the other methods, but I believe that holds as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a global integer id for clients, so the subscriber can tell them apart

Comment on lines 137 to 140
/// Callback invoked when a client message is received.
fn on_message_data(&self, channel_id: ClientChannelId, payload: &[u8]);
fn on_message_data(&self, _channel_id: ClientChannelId, _payload: &[u8]) {}
/// Callback invoked when a client subscribes to a channel.
fn on_subscribe(&self, _channel_id: ChannelId) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For server-advertised channels, we allocate IDs internally. The ID is recoverable, but we're asking trait implementation to do some extra legwork to map a channel ID back to something they understand (like a topic). Maybe we go a bit further.

Presumably we also need to pass the client identity, whatever that may be (perhaps a struct with a view over client metadata, so we can add getter methods later without breaking compatibility?). Actually, I think this comment applies to all of the methods in this trait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we map the ChannelId back to the Channel and pass that instead to these two particular callbacks? That seems like it should work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented this

/// Provides a mechanism for registering callbacks for
/// handling client message events.
/// Provides a mechanism for registering callbacks for handling client message events.
/// All methods are optional.
pub trait ServerListener: Send + Sync {
Copy link
Contributor

@gasmith gasmith Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we might also want callbacks for client connect and disconnect, so that an implementer knows when to flush client-specific state (about channel advertisements, subscriptions, etc.).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing implementations don't go that far, but it seems like a good idea to me

for subscription_id in subscription_ids {
if let Some((channel_id, _)) = subscriptions.remove_by_right(&subscription_id) {
if let Some(handler) = self.server_listener.as_ref() {
handler.on_unsubscribe(channel_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should hold the subscriptions lock across the callback. If we do, we need to document that the SDK is non-reentrant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good point

subscription.id
);
if let Some(handler) = self.server_listener.as_ref() {
handler.on_subscribe(subscription.channel_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably ought to drop the channels lock before making these callbacks.

The main downside, I suppose, is that we might process a subscription callback after a channel is removed from the log context. But I think that's a straightforward corner case to document, and probably not one that makes much of a difference to an implementation of this trait.

/// Callback invoked when a client subscribes to a channel.
fn on_subscribe(&self, _channel_id: ChannelId) {}
/// Callback invoked when a client unsubscribes from a channel.
fn on_unsubscribe(&self, _channel_id: ChannelId) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should maybe document the fact that we do not invoke this callback for a subscribed channel that is removed from the log context, and unadvertised to the client. Maybe that's obvious, but it doesn't hurt to call it out.

Comment on lines 233 to 236
for id in channel_ids
.iter()
.cloned()
.filter(|id| !channels_not_found.contains(id))
Copy link
Contributor

@gasmith gasmith Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have to alloc a new vec anyway, we might as well just store the channel IDs we did find and avoid the possible O(N^2) over the client payload.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

channels_not_found is basically an error case that's not going to happen often, so I would expect this approach to perform best on average, but with worse worst-case performance (which is why I did it this way)

pub trait ServerListener: Send + Sync {
/// Callback invoked when a client message is received.
fn on_message_data(&self, channel_id: ClientChannelId, payload: &[u8]);
fn on_message_data(&self, _channel_id: ClientChannelId, _payload: &[u8]) {}
Copy link
Contributor

@gasmith gasmith Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we lookup the ClientChannel before invoking this callback. Maybe we should pass a (view over) ClientChannel, instead of asking the trait implementation to maintain its own <ClientChannelId, ClientChannel> map.

@@ -171,60 +182,30 @@ impl ConnectedClient {
self.send_error(format!("Invalid message: {message}"));
return;
};
let Some(server) = self.server.upgrade() else {
tracing::error!("Server closed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a bit loud? Is this something the user can do anything about?

pub use log_context::GlobalContextTest;
pub use log_sink::{ErrorSink, MockSink, RecordingSink};
use parking_lot::Mutex;

#[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out not needed anymore

client_sender
.send(Message::text(subscribe.to_string()))
.await
.expect("Failed to send");

// Allow the server to process the subscription
// FG-9723: replace this with an on_subscribe callback
// (whoops, that won't work either, unless we do something like polling the recording_listener)
Copy link
Contributor

@gasmith gasmith Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A pattern I've used in the past is assert_eventually(conf: impl Fn() -> bool). Imagine we had something like:

let client: Arc<TestClient> = ...;
tokio::spawn(client.clone().receive_forever());

for msg in [/* requests */] {
    client.send(msg).await.unwrap();
}

let expected = vec![ /* responses */];
dbg!(&expected);
assert_eventually(|| {
    expect == dbg!(client.get_received())
}).await;
client.reset_received();

@eloff eloff requested review from bryfox and gasmith February 6, 2025 23:33
Copy link
Contributor

@gasmith gasmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG, just some minor dangling comments.

}
/// Callback invoked when a client subscribes to a channel.
/// Only invoked if the channel is associated with the server and isn't already subscribed to by the client.
fn on_subscribe(&self, _client_id: ClientId, _channel_id: Arc<Channel>) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can rename these args as channel now, instead of channel_id.

Also, maybe we just pass a &Channel so that implementations aren't tempted to clone or hold on to the arc.

@@ -96,11 +103,23 @@ pub(crate) struct Server {
/// handling client message events.
pub trait ServerListener: Send + Sync {
/// Callback invoked when a client message is received.
fn on_message_data(&self, channel_id: ClientChannelId, payload: &[u8]);
fn on_message_data(&self, _client_id: ClientId, _channel_id: ClientChannelId, _payload: &[u8]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also pass &ClientChannel here (and on_client_unadvertise)? We're already doing the lookup before invoking the callback.

Copy link
Contributor Author

@eloff eloff Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the slack discussion, I went with wrapper view structs for all the argument types, exposing the bare minimum for now.

We'll need to provide some way to lookup a ClientChannel to get the full data, as that is now the one place where we provide less data than the existing callbacks in the original Python implementation.

Copy link
Contributor

@gasmith gasmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG. I suspect we'll need to expand ClientChannelView with schema information for it to be useful, but we can cross that bridge when we get there.

@eloff eloff merged commit 62680fe into main Feb 7, 2025
27 checks passed
@eloff eloff deleted the dan/fg-9723-core-implement-ws-server-listener-interface-for-subs-adverts branch February 7, 2025 18:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants