Skip to content

Commit

Permalink
Core implement ws server listener interface for subs and adverts (#189)
Browse files Browse the repository at this point in the history
Add new callbacks to ServerListener:

```
fn on_subscribe(&self, _channel_id: ChannelId) {}
fn on_unsubscribe(&self, _channel_id: ChannelId) {}
fn on_client_advertise(&self, _channel: &ClientChannel) {}
fn on_client_unadvertise(&self, _channel_id: ClientChannelId) {}
```

This mirrors how the callbacks are defined in the [Python
implementation](https://github.com/foxglove/ws-protocol/blob/main/python/src/foxglove_websocket/server/__init__.py#L508),
and they function in a similar way (e.g. not firing for duplicate or
erroneous requests)

I added RecordingServerListener to testutil to allow more easily testing
ServerListener.

I modified existing tests to verify these callbacks are called at the
appropriate times with the expected arguments, and not called for
duplicate requests.
  • Loading branch information
eloff authored Feb 7, 2025
1 parent 78b3c19 commit 62680fe
Show file tree
Hide file tree
Showing 11 changed files with 525 additions and 130 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/foxglove/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ license = "MIT"
unstable = []

[dependencies]
bimap = "0.6.3"
schemars = "0.8.21"
arc-swap = "1.7.1"
base64 = "0.22.1"
Expand Down
7 changes: 4 additions & 3 deletions rust/foxglove/examples/unstable/client-publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use clap::Parser;
use foxglove::schemas::log::Level;
use foxglove::schemas::Log;
use foxglove::{
Capability, ClientChannelId, PartialMetadata, ServerListener, TypedChannel, WebSocketServer,
Capability, Client, ClientChannelView, PartialMetadata, ServerListener, TypedChannel,
WebSocketServer,
};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
Expand All @@ -29,10 +30,10 @@ struct Cli {

struct ExampleCallbackHandler;
impl ServerListener for ExampleCallbackHandler {
fn on_message_data(&self, channel_id: ClientChannelId, message: &[u8]) {
fn on_message_data(&self, _client: Client, channel: ClientChannelView, message: &[u8]) {
let json: serde_json::Value =
serde_json::from_slice(message).expect("Failed to parse message");
println!("Received message on channel {channel_id}: {json}");
println!("Received message on channel {}: {json}", channel.id());
}
}

Expand Down
6 changes: 2 additions & 4 deletions rust/foxglove/src/cow_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,16 @@ mod tests {

let handle = thread::spawn(move || {
vec2.push(4);
vec2.get().to_vec()
});

vec.push(5);
let thread_state = handle.join().unwrap();
handle.join().unwrap();
let final_state = vec.get().to_vec();

// Old snapshot should still be valid and have original length
assert_eq!(old_snapshot.len(), 3);
// Both threads should see 5 items in their final state
// There should now be 5 items in the final state
assert_eq!(final_state.len(), 5);
assert_eq!(thread_state.len(), 5);
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion rust/foxglove/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,9 @@ pub use runtime::shutdown_runtime;
pub(crate) use time::nanoseconds_since_epoch;
#[doc(hidden)]
#[cfg(feature = "unstable")]
pub use websocket::{Capability, Parameter, ParameterType, ParameterValue};
pub use websocket::{
Capability, ClientChannelId, Parameter, ParameterType, ParameterValue, ServerListener,
ChannelView, Client, ClientChannelId, ClientChannelView, ClientId, ServerListener,
};
pub use websocket_server::{WebSocketServer, WebSocketServerBlockingHandle, WebSocketServerHandle};

Expand Down
5 changes: 3 additions & 2 deletions rust/foxglove/src/tests/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn test_logging_to_file_and_live_sinks() {
ws_stream
};

// FG-9877: allow the server to handle the connection before creating the channel
// FG-10395 replace this with something more precise
tokio::time::sleep(Duration::from_millis(100)).await;

let channel = ChannelBuilder::new("/test-topic")
Expand Down Expand Up @@ -103,6 +103,7 @@ async fn test_logging_to_file_and_live_sinks() {
.expect("Failed to subscribe");

// Let subscription register before publishing
// FG-10395 replace this with something more precise
tokio::time::sleep(Duration::from_millis(100)).await;
}

Expand All @@ -122,7 +123,7 @@ async fn test_logging_to_file_and_live_sinks() {

channel.log(&msg);

// Ensure message has arrived
// FG-10395 replace this with something more precise
tokio::time::sleep(Duration::from_millis(100)).await;

let writer = handle.close().expect("Failed to flush log");
Expand Down
104 changes: 104 additions & 0 deletions rust/foxglove/src/testutil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,109 @@
mod log_context;
mod log_sink;

use crate::channel::ChannelId;
use crate::websocket::{
ChannelView, Client, ClientChannelId, ClientChannelView, ClientId, ServerListener,
};
pub use log_context::GlobalContextTest;
pub use log_sink::{ErrorSink, MockSink, RecordingSink};
use parking_lot::Mutex;

#[allow(dead_code)]
pub(crate) struct ClientChannelInfo {
pub(crate) id: ClientChannelId,
pub(crate) topic: String,
}

impl From<ClientChannelView<'_>> for ClientChannelInfo {
fn from(channel: ClientChannelView) -> Self {
Self {
id: channel.id(),
topic: channel.topic().to_string(),
}
}
}

pub(crate) struct ChannelInfo {
pub(crate) id: ChannelId,
pub(crate) topic: String,
}

impl From<ChannelView<'_>> for ChannelInfo {
fn from(channel: ChannelView) -> Self {
Self {
id: channel.id(),
topic: channel.topic().to_string(),
}
}
}

pub(crate) struct RecordingServerListener {
message_data: Mutex<Vec<(ClientId, ClientChannelInfo, Vec<u8>)>>,
subscribe: Mutex<Vec<(ClientId, ChannelInfo)>>,
unsubscribe: Mutex<Vec<(ClientId, ChannelInfo)>>,
client_advertise: Mutex<Vec<(ClientId, ClientChannelInfo)>>,
client_unadvertise: Mutex<Vec<(ClientId, ClientChannelInfo)>>,
}

impl RecordingServerListener {
pub fn new() -> Self {
Self {
message_data: Mutex::new(Vec::new()),
subscribe: Mutex::new(Vec::new()),
unsubscribe: Mutex::new(Vec::new()),
client_advertise: Mutex::new(Vec::new()),
client_unadvertise: Mutex::new(Vec::new()),
}
}

#[allow(dead_code)]
pub fn take_message_data(&self) -> Vec<(ClientId, ClientChannelInfo, Vec<u8>)> {
std::mem::take(&mut self.message_data.lock())
}

pub fn take_subscribe(&self) -> Vec<(ClientId, ChannelInfo)> {
std::mem::take(&mut self.subscribe.lock())
}

pub fn take_unsubscribe(&self) -> Vec<(ClientId, ChannelInfo)> {
std::mem::take(&mut self.unsubscribe.lock())
}

#[allow(dead_code)]
pub fn take_client_advertise(&self) -> Vec<(ClientId, ClientChannelInfo)> {
std::mem::take(&mut self.client_advertise.lock())
}

#[allow(dead_code)]
pub fn take_client_unadvertise(&self) -> Vec<(ClientId, ClientChannelInfo)> {
std::mem::take(&mut self.client_unadvertise.lock())
}
}

impl ServerListener for RecordingServerListener {
fn on_message_data(&self, client: Client, channel: ClientChannelView, payload: &[u8]) {
let mut data = self.message_data.lock();
data.push((client.id(), channel.into(), payload.to_vec()));
}

fn on_subscribe(&self, client: Client, channel: ChannelView) {
let mut subs = self.subscribe.lock();
subs.push((client.id(), channel.into()));
}

fn on_unsubscribe(&self, client: Client, channel: ChannelView) {
let mut unsubs = self.unsubscribe.lock();
unsubs.push((client.id(), channel.into()));
}

fn on_client_advertise(&self, client: Client, channel: ClientChannelView) {
let mut adverts = self.client_advertise.lock();
adverts.push((client.id(), channel.into()));
}

fn on_client_unadvertise(&self, client: Client, channel: ClientChannelView) {
let mut unadverts = self.client_unadvertise.lock();
unadverts.push((client.id(), channel.into()));
}
}
Loading

0 comments on commit 62680fe

Please sign in to comment.