-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core implement ws server listener interface for subs and adverts #189
Core implement ws server listener interface for subs and adverts #189
Conversation
…r-interface-for-subs-adverts
…r-interface-for-subs-adverts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good to me. As discussed, I think we can probably bring the server listener out from behind the feature now.
There's a client-publish
example which uses part of this — it might be good to augment that with the advertisement handling.
let final_state = vec.get().to_vec(); | ||
|
||
// Old snapshot should still be valid and have original length | ||
assert_eq!(old_snapshot.len(), 3); | ||
// Both threads should see 5 items in their final state | ||
assert_eq!(final_state.len(), 5); | ||
assert_eq!(thread_state.len(), 5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why this is no longer desired, but the comment above is now stale.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix the comment, this was a race condition. It would be 4 items if the thread runs before the push(5) after starting the thread.
rust/foxglove/src/tests/websocket.rs
Outdated
client_sender | ||
.send(Message::text(subscribe.to_string())) | ||
.await | ||
.expect("Failed to send"); | ||
|
||
// Allow the server to process the subscription | ||
// FG-9723: replace this with an on_subscribe callback | ||
// (whoops, that won't work either, unless we do something like polling the recording_listener) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe a way to flush the queues?
I think we should file an issue so we can clean up the comment above (not referencing a completed ticket) — whatever approach you think is best.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There doesn't seem to be a way to wait for the queue to be drained, other than polling it, checking is_empty - but that still doesn't mean the data was acted on, just that it was removed from the queue. So I think it's the wrong thing to do.
I'm not sure what a sensible way to do this would look like. It's easy enough to come up with something just for tests where we poll somehow and wait for the messages to be processed.
Is that just for tests, or do we need something more polished that we could expose to the user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion, just for tests. I guess we don't necessarily need to poll the queue — we could have a test helper that polls, with a timeout, whatever future we want to wait on in the test (e.g. client_receiver.next()
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A pattern I've used in the past is assert_eventually(conf: impl Fn() -> bool)
. Imagine we had something like:
let client: Arc<TestClient> = ...;
tokio::spawn(client.clone().receive_forever());
for msg in [/* requests */] {
client.send(msg).await.unwrap();
}
let expected = vec![ /* responses */];
dbg!(&expected);
assert_eventually(|| {
expect == dbg!(client.get_received())
}).await;
client.reset_received();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That future is buried at the bottom of Server::handle_connection
, which is what makes it tricky. I'll create ticket to track it, but I'd have to experiment and think about it as to how we could implement it in a sensible way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rust/foxglove/src/websocket.rs
Outdated
/// Callback invoked when a client advertises a client channel. Requires the "clientPublish" capability. | ||
fn on_client_advertise(&self, _channel: &ClientChannel) {} | ||
/// Callback invoked when a client unadvertises a client channel. Requires the "clientPublish" capability. | ||
fn on_client_unadvertise(&self, _channel_id: ClientChannelId) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for this PR, but I'm wondering if we can design a trait in a way that if you implement one of the 'clientPublish' functions, it forces you to implement all — to help guide correct usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think that's a nice idea. I don't know exactly how that would look (or if we can do that.) Something to keep in mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few early comments on the trait, still reading through the rest.
rust/foxglove/src/websocket.rs
Outdated
pub trait ServerListener: Send + Sync { | ||
/// Callback invoked when a client message is received. | ||
fn on_message_data(&self, channel_id: ClientChannelId, payload: &[u8]); | ||
fn on_message_data(&self, _channel_id: ClientChannelId, _payload: &[u8]) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the ClientChannelId
globally unique, or locally-unique to a particular client? I presume the latter? Might IDs be reused, from the perspective of an implementer? We should probably update the rust doc for that type, and add some commentary to these methods to help guide user expectations.
We should add a reference to https://github.com/foxglove/ws-protocol/blob/main/docs/spec.md#client-message-data. Same goes for the other methods, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should only be unique per client, which suggests maybe we should pass in some kind of identifier for the client as well (what? SocketAddr? make up an id?). The Python implementation doesn't address that, and has these same signatures.
The payload is fine, the message only contains the channel id and the payload, and we pass those separately to the callback. I'll double-check the other methods, but I believe that holds as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a global integer id for clients, so the subscriber can tell them apart
rust/foxglove/src/websocket.rs
Outdated
/// Callback invoked when a client message is received. | ||
fn on_message_data(&self, channel_id: ClientChannelId, payload: &[u8]); | ||
fn on_message_data(&self, _channel_id: ClientChannelId, _payload: &[u8]) {} | ||
/// Callback invoked when a client subscribes to a channel. | ||
fn on_subscribe(&self, _channel_id: ChannelId) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For server-advertised channels, we allocate IDs internally. The ID is recoverable, but we're asking trait implementation to do some extra legwork to map a channel ID back to something they understand (like a topic). Maybe we go a bit further.
Presumably we also need to pass the client identity, whatever that may be (perhaps a struct with a view over client metadata, so we can add getter methods later without breaking compatibility?). Actually, I think this comment applies to all of the methods in this trait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we map the ChannelId back to the Channel and pass that instead to these two particular callbacks? That seems like it should work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented this
/// Provides a mechanism for registering callbacks for | ||
/// handling client message events. | ||
/// Provides a mechanism for registering callbacks for handling client message events. | ||
/// All methods are optional. | ||
pub trait ServerListener: Send + Sync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we might also want callbacks for client connect and disconnect, so that an implementer knows when to flush client-specific state (about channel advertisements, subscriptions, etc.).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing implementations don't go that far, but it seems like a good idea to me
rust/foxglove/src/websocket.rs
Outdated
for subscription_id in subscription_ids { | ||
if let Some((channel_id, _)) = subscriptions.remove_by_right(&subscription_id) { | ||
if let Some(handler) = self.server_listener.as_ref() { | ||
handler.on_unsubscribe(channel_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should hold the subscriptions lock across the callback. If we do, we need to document that the SDK is non-reentrant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's a good point
rust/foxglove/src/websocket.rs
Outdated
subscription.id | ||
); | ||
if let Some(handler) = self.server_listener.as_ref() { | ||
handler.on_subscribe(subscription.channel_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably ought to drop the channels lock before making these callbacks.
The main downside, I suppose, is that we might process a subscription callback after a channel is removed from the log context. But I think that's a straightforward corner case to document, and probably not one that makes much of a difference to an implementation of this trait.
rust/foxglove/src/websocket.rs
Outdated
/// Callback invoked when a client subscribes to a channel. | ||
fn on_subscribe(&self, _channel_id: ChannelId) {} | ||
/// Callback invoked when a client unsubscribes from a channel. | ||
fn on_unsubscribe(&self, _channel_id: ChannelId) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should maybe document the fact that we do not invoke this callback for a subscribed channel that is removed from the log context, and unadvertised to the client. Maybe that's obvious, but it doesn't hurt to call it out.
rust/foxglove/src/websocket.rs
Outdated
for id in channel_ids | ||
.iter() | ||
.cloned() | ||
.filter(|id| !channels_not_found.contains(id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have to alloc a new vec anyway, we might as well just store the channel IDs we did find and avoid the possible O(N^2) over the client payload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
channels_not_found is basically an error case that's not going to happen often, so I would expect this approach to perform best on average, but with worse worst-case performance (which is why I did it this way)
rust/foxglove/src/websocket.rs
Outdated
pub trait ServerListener: Send + Sync { | ||
/// Callback invoked when a client message is received. | ||
fn on_message_data(&self, channel_id: ClientChannelId, payload: &[u8]); | ||
fn on_message_data(&self, _channel_id: ClientChannelId, _payload: &[u8]) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we lookup the ClientChannel
before invoking this callback. Maybe we should pass a (view over) ClientChannel
, instead of asking the trait implementation to maintain its own <ClientChannelId, ClientChannel>
map.
@@ -171,60 +182,30 @@ impl ConnectedClient { | |||
self.send_error(format!("Invalid message: {message}")); | |||
return; | |||
}; | |||
let Some(server) = self.server.upgrade() else { | |||
tracing::error!("Server closed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a bit loud? Is this something the user can do anything about?
pub use log_context::GlobalContextTest; | ||
pub use log_sink::{ErrorSink, MockSink, RecordingSink}; | ||
use parking_lot::Mutex; | ||
|
||
#[allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out not needed anymore
rust/foxglove/src/tests/websocket.rs
Outdated
client_sender | ||
.send(Message::text(subscribe.to_string())) | ||
.await | ||
.expect("Failed to send"); | ||
|
||
// Allow the server to process the subscription | ||
// FG-9723: replace this with an on_subscribe callback | ||
// (whoops, that won't work either, unless we do something like polling the recording_listener) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A pattern I've used in the past is assert_eventually(conf: impl Fn() -> bool)
. Imagine we had something like:
let client: Arc<TestClient> = ...;
tokio::spawn(client.clone().receive_forever());
for msg in [/* requests */] {
client.send(msg).await.unwrap();
}
let expected = vec![ /* responses */];
dbg!(&expected);
assert_eventually(|| {
expect == dbg!(client.get_received())
}).await;
client.reset_received();
…ework invoking code to not hold locks while invoking callbacks, pass Channel refs instead of channel ids
…r-interface-for-subs-adverts
…r-interface-for-subs-adverts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LG, just some minor dangling comments.
rust/foxglove/src/websocket.rs
Outdated
} | ||
/// Callback invoked when a client subscribes to a channel. | ||
/// Only invoked if the channel is associated with the server and isn't already subscribed to by the client. | ||
fn on_subscribe(&self, _client_id: ClientId, _channel_id: Arc<Channel>) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can rename these args as channel
now, instead of channel_id
.
Also, maybe we just pass a &Channel
so that implementations aren't tempted to clone or hold on to the arc.
rust/foxglove/src/websocket.rs
Outdated
@@ -96,11 +103,23 @@ pub(crate) struct Server { | |||
/// handling client message events. | |||
pub trait ServerListener: Send + Sync { | |||
/// Callback invoked when a client message is received. | |||
fn on_message_data(&self, channel_id: ClientChannelId, payload: &[u8]); | |||
fn on_message_data(&self, _client_id: ClientId, _channel_id: ClientChannelId, _payload: &[u8]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also pass &ClientChannel
here (and on_client_unadvertise
)? We're already doing the lookup before invoking the callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the slack discussion, I went with wrapper view structs for all the argument types, exposing the bare minimum for now.
We'll need to provide some way to lookup a ClientChannel to get the full data, as that is now the one place where we provide less data than the existing callbacks in the original Python implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LG. I suspect we'll need to expand ClientChannelView with schema information for it to be useful, but we can cross that bridge when we get there.
Add new callbacks to ServerListener:
This mirrors how the callbacks are defined in the Python implementation, and they function in a similar way (e.g. not firing for duplicate or erroneous requests)
I added RecordingServerListener to testutil to allow more easily testing ServerListener.
I modified existing tests to verify these callbacks are called at the appropriate times with the expected arguments, and not called for duplicate requests.