diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 59c8f1e6d..5bd10d554 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -55,7 +55,9 @@ use crate::error::RPCError; use crate::error::Timeout; use crate::log_id::LogIdOptionExt; use crate::log_id::RaftLogId; +use crate::metrics::RaftDataMetrics; use crate::metrics::RaftMetrics; +use crate::metrics::RaftServerMetrics; use crate::metrics::ReplicationMetrics; use crate::network::RPCOption; use crate::network::RPCTypes; @@ -198,6 +200,8 @@ where pub(crate) rx_notify: mpsc::UnboundedReceiver>, pub(crate) tx_metrics: watch::Sender>, + pub(crate) tx_data_metrics: watch::Sender>, + pub(crate) tx_server_metrics: watch::Sender>, pub(crate) command_state: CommandState, @@ -522,6 +526,7 @@ where pub(crate) fn report_metrics(&self, replication: Option>) { let st = &self.engine.state; + let membership_config = st.membership_state.effective().stored_membership().clone(); let m = RaftMetrics { running_state: Ok(()), id: self.id, @@ -537,10 +542,10 @@ where // --- cluster --- state: st.server_state, current_leader: self.current_leader(), - membership_config: st.membership_state.effective().stored_membership().clone(), + membership_config: membership_config.clone(), // --- replication --- - replication, + replication: replication.clone(), }; tracing::debug!("report_metrics: {}", m.summary()); @@ -549,6 +554,36 @@ where if let Err(err) = res { tracing::error!(error=%err, id=display(self.id), "error reporting metrics"); } + + let data_metrics = RaftDataMetrics { + last_log: st.last_log_id().copied(), + last_applied: st.io_applied().copied(), + snapshot: st.io_snapshot_last_log_id().copied(), + purged: st.io_purged().copied(), + replication, + }; + self.tx_data_metrics.send_if_modified(|metrix| { + if data_metrics.ne(metrix) { + *metrix = data_metrics.clone(); + return true; + } + false + }); + + let server_metrics = RaftServerMetrics { + id: self.id, + vote: *st.io_state().vote(), + state: st.server_state, + current_leader: self.current_leader(), + membership_config, + }; + self.tx_server_metrics.send_if_modified(|metrix| { + if server_metrics.ne(metrix) { + *metrix = server_metrics.clone(); + return true; + } + false + }); } /// Handle the admin command `initialize`. diff --git a/openraft/src/metrics/mod.rs b/openraft/src/metrics/mod.rs index f67f3e7a5..8d672195a 100644 --- a/openraft/src/metrics/mod.rs +++ b/openraft/src/metrics/mod.rs @@ -38,7 +38,9 @@ mod wait_condition; use std::collections::BTreeMap; pub use metric::Metric; +pub use raft_metrics::RaftDataMetrics; pub use raft_metrics::RaftMetrics; +pub use raft_metrics::RaftServerMetrics; pub use wait::Wait; pub use wait::WaitError; pub(crate) use wait_condition::Condition; diff --git a/openraft/src/metrics/raft_metrics.rs b/openraft/src/metrics/raft_metrics.rs index e17c52318..e0bd6362b 100644 --- a/openraft/src/metrics/raft_metrics.rs +++ b/openraft/src/metrics/raft_metrics.rs @@ -140,3 +140,96 @@ where } } } + +/// Subset of RaftMetrics, only include data-related metrics +#[derive(Clone, Debug, Default, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] +pub struct RaftDataMetrics +where NID: NodeId +{ + pub last_log: Option>, + pub last_applied: Option>, + pub snapshot: Option>, + pub purged: Option>, + pub replication: Option>, +} + +impl fmt::Display for RaftDataMetrics +where NID: NodeId +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "DataMetrics{{")?; + + write!( + f, + "last_log:{}, last_applied:{}, snapshot:{}, purged:{}, replication:{{{}}}", + DisplayOption(&self.last_log), + DisplayOption(&self.last_applied), + DisplayOption(&self.snapshot), + DisplayOption(&self.purged), + self.replication + .as_ref() + .map(|x| { x.iter().map(|(k, v)| format!("{}:{}", k, DisplayOption(v))).collect::>().join(",") }) + .unwrap_or_default(), + )?; + + write!(f, "}}")?; + Ok(()) + } +} + +impl MessageSummary> for RaftDataMetrics +where NID: NodeId +{ + fn summary(&self) -> String { + self.to_string() + } +} + +/// Subset of RaftMetrics, only include server-related metrics +#[derive(Clone, Debug, Default, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] +pub struct RaftServerMetrics +where + NID: NodeId, + N: Node, +{ + pub id: NID, + pub vote: Vote, + pub state: ServerState, + pub current_leader: Option, + pub membership_config: Arc>, +} + +impl fmt::Display for RaftServerMetrics +where + NID: NodeId, + N: Node, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "ServerMetrics{{")?; + + write!( + f, + "id:{}, {:?}, vote:{}, leader:{}, membership:{}", + self.id, + self.state, + self.vote, + DisplayOption(&self.current_leader), + self.membership_config.summary(), + )?; + + write!(f, "}}")?; + Ok(()) + } +} + +impl MessageSummary> for RaftServerMetrics +where + NID: NodeId, + N: Node, +{ + fn summary(&self) -> String { + self.to_string() + } +} diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 8475eab6d..69e0f5c16 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -51,7 +51,9 @@ use crate::error::InitializeError; use crate::error::InstallSnapshotError; use crate::error::RaftError; use crate::membership::IntoNodes; +use crate::metrics::RaftDataMetrics; use crate::metrics::RaftMetrics; +use crate::metrics::RaftServerMetrics; use crate::metrics::Wait; use crate::metrics::WaitError; use crate::network::RaftNetworkFactory; @@ -174,6 +176,8 @@ where C: RaftTypeConfig let (tx_api, rx_api) = mpsc::unbounded_channel(); let (tx_notify, rx_notify) = mpsc::unbounded_channel(); let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id)); + let (tx_data_metrics, rx_data_metrics) = watch::channel(RaftDataMetrics::default()); + let (tx_server_metrics, rx_server_metrics) = watch::channel(RaftServerMetrics::default()); let (tx_shutdown, rx_shutdown) = oneshot::channel(); let tick_handle = Tick::spawn( @@ -224,6 +228,8 @@ where C: RaftTypeConfig rx_notify, tx_metrics, + tx_data_metrics, + tx_server_metrics, command_state: CommandState::default(), span: core_span, @@ -240,6 +246,8 @@ where C: RaftTypeConfig tick_handle, tx_api, rx_metrics, + rx_data_metrics, + rx_server_metrics, tx_shutdown: Mutex::new(Some(tx_shutdown)), core_state: Mutex::new(CoreState::Running(core_handle)), }; @@ -822,6 +830,16 @@ where C: RaftTypeConfig self.inner.rx_metrics.clone() } + /// Get a handle to the data metrics channel. + pub fn data_metrics(&self) -> watch::Receiver> { + self.inner.rx_data_metrics.clone() + } + + /// Get a handle to the server metrics channel. + pub fn server_metrics(&self) -> watch::Receiver> { + self.inner.rx_server_metrics.clone() + } + /// Get a handle to wait for the metrics to satisfy some condition. /// /// If `timeout` is `None`, then it will wait forever(10 years). diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index 69712ce4e..f8b7c9fcb 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -11,6 +11,8 @@ use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::RaftMsg; use crate::core::TickHandle; use crate::error::Fatal; +use crate::metrics::RaftDataMetrics; +use crate::metrics::RaftServerMetrics; use crate::raft::core_state::CoreState; use crate::AsyncRuntime; use crate::Config; @@ -28,6 +30,8 @@ where C: RaftTypeConfig pub(in crate::raft) tick_handle: TickHandle, pub(in crate::raft) tx_api: mpsc::UnboundedSender>, pub(in crate::raft) rx_metrics: watch::Receiver>, + pub(in crate::raft) rx_data_metrics: watch::Receiver>, + pub(in crate::raft) rx_server_metrics: watch::Receiver>, // TODO(xp): it does not need to be a async mutex. #[allow(clippy::type_complexity)] diff --git a/tests/tests/metrics/main.rs b/tests/tests/metrics/main.rs index 409d07b78..f212f60d9 100644 --- a/tests/tests/metrics/main.rs +++ b/tests/tests/metrics/main.rs @@ -9,6 +9,7 @@ mod fixtures; mod t10_current_leader; mod t10_purged; +mod t10_server_metrics_and_data_metrics; mod t20_metrics_state_machine_consistency; mod t30_leader_metrics; mod t40_metrics_wait; diff --git a/tests/tests/metrics/t10_server_metrics_and_data_metrics.rs b/tests/tests/metrics/t10_server_metrics_and_data_metrics.rs new file mode 100644 index 000000000..c85204e46 --- /dev/null +++ b/tests/tests/metrics/t10_server_metrics_and_data_metrics.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +use anyhow::Result; +use maplit::btreeset; +use openraft::Config; +#[allow(unused_imports)] use pretty_assertions::assert_eq; +#[allow(unused_imports)] use pretty_assertions::assert_ne; + +use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::RaftRouter; + +/// Server metrics and data metrics method should work. +#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +async fn server_metrics_and_data_metrics() -> Result<()> { + // Setup test dependencies. + let config = Arc::new( + Config { + enable_heartbeat: false, + enable_elect: false, + ..Default::default() + } + .validate()?, + ); + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- initializing cluster"); + let mut log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?; + + let node = router.get_raft_handle(&0)?; + let mut server_metrics = node.server_metrics(); + let data_metrics = node.data_metrics(); + + let current_leader = router.current_leader(0).await; + let leader = server_metrics.borrow_and_update().current_leader; + assert_eq!(leader, current_leader, "current_leader should be {:?}", current_leader); + + // Write some logs. + let n = 10; + tracing::info!(log_index, "--- write {} logs", n); + log_index += router.client_request_many(0, "foo", n).await?; + + let last_log_index = data_metrics.borrow().last_log.unwrap_or_default().index; + assert_eq!(last_log_index, log_index, "last_log_index should be {:?}", log_index); + assert!( + !server_metrics.borrow().has_changed(), + "server metrics should not update" + ); + Ok(()) +}