Skip to content

Commit

Permalink
sdk: Basic connection metrics (#2299)
Browse files Browse the repository at this point in the history
  • Loading branch information
kim authored Feb 24, 2025
1 parent 668eef0 commit d92a3f5
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 4 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion crates/sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ description = "A Rust SDK for clients to interface with SpacetimeDB"
[dependencies]
spacetimedb-data-structures.workspace = true
spacetimedb-sats.workspace = true
spacetimedb-lib = { workspace = true, features = ["serde"]}
spacetimedb-lib = { workspace = true, features = ["serde", "metrics_impls"]}
spacetimedb-client-api-messages.workspace = true
spacetimedb-metrics.workspace = true

thiserror.workspace = true
anymap.workspace = true
Expand All @@ -23,6 +24,8 @@ futures-channel.workspace = true
home.workspace = true
http.workspace = true
log.workspace = true
once_cell.workspace = true
prometheus.workspace = true
rand.workspace = true
tokio.workspace = true
tokio-tungstenite.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions crates/sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
mod callbacks;
mod client_cache;
mod db_connection;
mod metrics;
mod spacetime_module;
mod subscription;
mod websocket;
Expand Down Expand Up @@ -67,5 +68,6 @@ pub mod unstable {
//!
//! These may change incompatibly without a major version bump.
pub use crate::db_connection::set_connection_id;
pub use crate::metrics::{ClientMetrics, CLIENT_METRICS};
pub use spacetimedb_client_api_messages::websocket::CallReducerFlags;
}
20 changes: 20 additions & 0 deletions crates/sdk/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use once_cell::sync::Lazy;
use prometheus::{HistogramVec, IntCounterVec};
use spacetimedb_lib::ConnectionId;
use spacetimedb_metrics::metrics_group;

metrics_group!(
pub struct ClientMetrics {
#[name = spacetime_client_received_total]
#[help = "The cumulative number of received websocket messages"]
#[labels(db: Box<str>, connection_id: ConnectionId)]
pub websocket_received: IntCounterVec,

#[name = spacetime_client_received_msg_size]
#[help = "The size of received websocket messages"]
#[labels(db: Box<str>, connection_id: ConnectionId)]
pub websocket_received_msg_size: HistogramVec,
}
);

pub static CLIENT_METRICS: Lazy<ClientMetrics> = Lazy::new(ClientMetrics::new);
31 changes: 28 additions & 3 deletions crates/sdk/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use tokio_tungstenite::{
MaybeTlsStream, WebSocketStream,
};

use crate::metrics::CLIENT_METRICS;

#[derive(Error, Debug, Clone)]
pub enum UriError {
#[error("Unknown URI scheme {scheme}, expected http, https, ws or wss")]
Expand Down Expand Up @@ -80,6 +82,8 @@ pub enum WsError {
}

pub(crate) struct WsConnection {
db_name: Box<str>,
connection_id: ConnectionId,
sock: WebSocketStream<MaybeTlsStream<TcpStream>>,
}

Expand Down Expand Up @@ -223,7 +227,11 @@ impl WsConnection {
uri,
source: Arc::new(source),
})?;
Ok(WsConnection { sock })
Ok(WsConnection {
db_name: db_name.into(),
connection_id,
sock,
})
}

pub(crate) fn parse_response(bytes: &[u8]) -> Result<ServerMessage<BsatnFormat>, WsError> {
Expand Down Expand Up @@ -268,6 +276,17 @@ impl WsConnection {
incoming_messages: mpsc::UnboundedSender<ServerMessage<BsatnFormat>>,
outgoing_messages: mpsc::UnboundedReceiver<ClientMessage<Bytes>>,
) {
let record_metrics = |msg_size: usize| {
CLIENT_METRICS
.websocket_received
.with_label_values(&self.db_name, &self.connection_id)
.inc();
CLIENT_METRICS
.websocket_received_msg_size
.with_label_values(&self.db_name, &self.connection_id)
.observe(msg_size as f64);
};

let mut outgoing_messages = Some(outgoing_messages);
loop {
tokio::select! {
Expand All @@ -280,6 +299,7 @@ impl WsConnection {
),

Ok(Some(WebSocketMessage::Binary(bytes))) => {
record_metrics(bytes.len());
match Self::parse_response(&bytes) {
Err(e) => Self::maybe_log_error::<(), _>(
"Error decoding WebSocketMessage::Binary payload",
Expand All @@ -292,9 +312,14 @@ impl WsConnection {
}
}

Ok(Some(WebSocketMessage::Ping(_))) => {}
Ok(Some(WebSocketMessage::Ping(payload))) => {
record_metrics(payload.len());
}

Ok(Some(other)) => log::warn!("Unexpected WebSocket message {:?}", other),
Ok(Some(other)) => {
log::warn!("Unexpected WebSocket message {:?}", other);
record_metrics(other.len());
},
},

// this is stupid. we want to handle the channel close *once*, and then disable this branch
Expand Down

2 comments on commit d92a3f5

@github-actions
Copy link

@github-actions github-actions bot commented on d92a3f5 Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Criterion benchmark results

Error when comparing benchmarks: Couldn't find AWS credentials in environment, credentials file, or IAM role.

Caused by:
Couldn't find AWS credentials in environment, credentials file, or IAM role.

@github-actions
Copy link

@github-actions github-actions bot commented on d92a3f5 Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callgrind benchmark results Error when comparing benchmarks: Couldn't find AWS credentials in environment, credentials file, or IAM role.

Caused by:
Couldn't find AWS credentials in environment, credentials file, or IAM role.

Please sign in to comment.