Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core implement ws server listener interface for subs and adverts #189

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/foxglove/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ license = "MIT"
unstable = []

[dependencies]
bimap = "0.6.3"
schemars = "0.8.21"
arc-swap = "1.7.1"
base64 = "0.22.1"
Expand Down
7 changes: 4 additions & 3 deletions rust/foxglove/examples/unstable/client-publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use clap::Parser;
use foxglove::schemas::log::Level;
use foxglove::schemas::Log;
use foxglove::{
Capability, ClientChannelId, PartialMetadata, ServerListener, TypedChannel, WebSocketServer,
Capability, Client, ClientChannelView, PartialMetadata, ServerListener, TypedChannel,
WebSocketServer,
};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
Expand All @@ -29,10 +30,10 @@ struct Cli {

struct ExampleCallbackHandler;
impl ServerListener for ExampleCallbackHandler {
fn on_message_data(&self, channel_id: ClientChannelId, message: &[u8]) {
fn on_message_data(&self, _client: Client, channel: ClientChannelView, message: &[u8]) {
let json: serde_json::Value =
serde_json::from_slice(message).expect("Failed to parse message");
println!("Received message on channel {channel_id}: {json}");
println!("Received message on channel {}: {json}", channel.id());
}
}

Expand Down
6 changes: 2 additions & 4 deletions rust/foxglove/src/cow_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,16 @@ mod tests {

let handle = thread::spawn(move || {
vec2.push(4);
vec2.get().to_vec()
});

vec.push(5);
let thread_state = handle.join().unwrap();
handle.join().unwrap();
let final_state = vec.get().to_vec();

// Old snapshot should still be valid and have original length
assert_eq!(old_snapshot.len(), 3);
// Both threads should see 5 items in their final state
// There should now be 5 items in the final state
assert_eq!(final_state.len(), 5);
assert_eq!(thread_state.len(), 5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this is no longer desired, but the comment above is now stale.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix the comment, this was a race condition. It would be 4 items if the thread runs before the push(5) after starting the thread.

}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion rust/foxglove/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,9 @@ pub use runtime::shutdown_runtime;
pub(crate) use time::nanoseconds_since_epoch;
#[doc(hidden)]
#[cfg(feature = "unstable")]
pub use websocket::{Capability, Parameter, ParameterType, ParameterValue};
pub use websocket::{
Capability, ClientChannelId, Parameter, ParameterType, ParameterValue, ServerListener,
ChannelView, Client, ClientChannelId, ClientChannelView, ClientId, ServerListener,
};
pub use websocket_server::{WebSocketServer, WebSocketServerBlockingHandle, WebSocketServerHandle};

Expand Down
5 changes: 3 additions & 2 deletions rust/foxglove/src/tests/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn test_logging_to_file_and_live_sinks() {
ws_stream
};

// FG-9877: allow the server to handle the connection before creating the channel
// FG-10395 replace this with something more precise
tokio::time::sleep(Duration::from_millis(100)).await;

let channel = ChannelBuilder::new("/test-topic")
Expand Down Expand Up @@ -103,6 +103,7 @@ async fn test_logging_to_file_and_live_sinks() {
.expect("Failed to subscribe");

// Let subscription register before publishing
// FG-10395 replace this with something more precise
tokio::time::sleep(Duration::from_millis(100)).await;
}

Expand All @@ -122,7 +123,7 @@ async fn test_logging_to_file_and_live_sinks() {

channel.log(&msg);

// Ensure message has arrived
// FG-10395 replace this with something more precise
tokio::time::sleep(Duration::from_millis(100)).await;

let writer = handle.close().expect("Failed to flush log");
Expand Down
104 changes: 104 additions & 0 deletions rust/foxglove/src/testutil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,109 @@
mod log_context;
mod log_sink;

use crate::channel::ChannelId;
use crate::websocket::{
ChannelView, Client, ClientChannelId, ClientChannelView, ClientId, ServerListener,
};
pub use log_context::GlobalContextTest;
pub use log_sink::{ErrorSink, MockSink, RecordingSink};
use parking_lot::Mutex;

#[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out not needed anymore

pub(crate) struct ClientChannelInfo {
pub(crate) id: ClientChannelId,
pub(crate) topic: String,
}

impl From<ClientChannelView<'_>> for ClientChannelInfo {
fn from(channel: ClientChannelView) -> Self {
Self {
id: channel.id(),
topic: channel.topic().to_string(),
}
}
}

pub(crate) struct ChannelInfo {
pub(crate) id: ChannelId,
pub(crate) topic: String,
}

impl From<ChannelView<'_>> for ChannelInfo {
fn from(channel: ChannelView) -> Self {
Self {
id: channel.id(),
topic: channel.topic().to_string(),
}
}
}

pub(crate) struct RecordingServerListener {
message_data: Mutex<Vec<(ClientId, ClientChannelInfo, Vec<u8>)>>,
subscribe: Mutex<Vec<(ClientId, ChannelInfo)>>,
unsubscribe: Mutex<Vec<(ClientId, ChannelInfo)>>,
client_advertise: Mutex<Vec<(ClientId, ClientChannelInfo)>>,
client_unadvertise: Mutex<Vec<(ClientId, ClientChannelInfo)>>,
}

impl RecordingServerListener {
pub fn new() -> Self {
Self {
message_data: Mutex::new(Vec::new()),
subscribe: Mutex::new(Vec::new()),
unsubscribe: Mutex::new(Vec::new()),
client_advertise: Mutex::new(Vec::new()),
client_unadvertise: Mutex::new(Vec::new()),
}
}

#[allow(dead_code)]
pub fn take_message_data(&self) -> Vec<(ClientId, ClientChannelInfo, Vec<u8>)> {
std::mem::take(&mut self.message_data.lock())
}

pub fn take_subscribe(&self) -> Vec<(ClientId, ChannelInfo)> {
std::mem::take(&mut self.subscribe.lock())
}

pub fn take_unsubscribe(&self) -> Vec<(ClientId, ChannelInfo)> {
std::mem::take(&mut self.unsubscribe.lock())
}

#[allow(dead_code)]
pub fn take_client_advertise(&self) -> Vec<(ClientId, ClientChannelInfo)> {
std::mem::take(&mut self.client_advertise.lock())
}

#[allow(dead_code)]
pub fn take_client_unadvertise(&self) -> Vec<(ClientId, ClientChannelInfo)> {
std::mem::take(&mut self.client_unadvertise.lock())
}
}

impl ServerListener for RecordingServerListener {
fn on_message_data(&self, client: Client, channel: ClientChannelView, payload: &[u8]) {
let mut data = self.message_data.lock();
data.push((client.id(), channel.into(), payload.to_vec()));
}

fn on_subscribe(&self, client: Client, channel: ChannelView) {
let mut subs = self.subscribe.lock();
subs.push((client.id(), channel.into()));
}

fn on_unsubscribe(&self, client: Client, channel: ChannelView) {
let mut unsubs = self.unsubscribe.lock();
unsubs.push((client.id(), channel.into()));
}

fn on_client_advertise(&self, client: Client, channel: ClientChannelView) {
let mut adverts = self.client_advertise.lock();
adverts.push((client.id(), channel.into()));
}

fn on_client_unadvertise(&self, client: Client, channel: ClientChannelView) {
let mut unadverts = self.client_unadvertise.lock();
unadverts.push((client.id(), channel.into()));
}
}
Loading