Skip to content

Commit

Permalink
Feature: add Raft::data_metrics() and Raft::server_metrics() (dat…
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Jan 14, 2024
1 parent 5447387 commit 8261589
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 2 deletions.
39 changes: 37 additions & 2 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ use crate::error::RPCError;
use crate::error::Timeout;
use crate::log_id::LogIdOptionExt;
use crate::log_id::RaftLogId;
use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftMetrics;
use crate::metrics::RaftServerMetrics;
use crate::metrics::ReplicationMetrics;
use crate::network::RPCOption;
use crate::network::RPCTypes;
Expand Down Expand Up @@ -198,6 +200,8 @@ where
pub(crate) rx_notify: mpsc::UnboundedReceiver<Notify<C>>,

pub(crate) tx_metrics: watch::Sender<RaftMetrics<C::NodeId, C::Node>>,
pub(crate) tx_data_metrics: watch::Sender<RaftDataMetrics<C::NodeId>>,
pub(crate) tx_server_metrics: watch::Sender<RaftServerMetrics<C::NodeId, C::Node>>,

pub(crate) command_state: CommandState,

Expand Down Expand Up @@ -522,6 +526,7 @@ where
pub(crate) fn report_metrics(&self, replication: Option<ReplicationMetrics<C::NodeId>>) {
let st = &self.engine.state;

let membership_config = st.membership_state.effective().stored_membership().clone();
let m = RaftMetrics {
running_state: Ok(()),
id: self.id,
Expand All @@ -537,10 +542,10 @@ where
// --- cluster ---
state: st.server_state,
current_leader: self.current_leader(),
membership_config: st.membership_state.effective().stored_membership().clone(),
membership_config: membership_config.clone(),

// --- replication ---
replication,
replication: replication.clone(),
};

tracing::debug!("report_metrics: {}", m.summary());
Expand All @@ -549,6 +554,36 @@ where
if let Err(err) = res {
tracing::error!(error=%err, id=display(self.id), "error reporting metrics");
}

let data_metrics = RaftDataMetrics {
last_log: st.last_log_id().copied(),
last_applied: st.io_applied().copied(),
snapshot: st.io_snapshot_last_log_id().copied(),
purged: st.io_purged().copied(),
replication,
};
self.tx_data_metrics.send_if_modified(|metrix| {
if data_metrics.ne(metrix) {
*metrix = data_metrics.clone();
return true;
}
false
});

let server_metrics = RaftServerMetrics {
id: self.id,
vote: *st.io_state().vote(),
state: st.server_state,
current_leader: self.current_leader(),
membership_config,
};
self.tx_server_metrics.send_if_modified(|metrix| {
if server_metrics.ne(metrix) {
*metrix = server_metrics.clone();
return true;
}
false
});
}

/// Handle the admin command `initialize`.
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ mod wait_condition;
use std::collections::BTreeMap;

pub use metric::Metric;
pub use raft_metrics::RaftDataMetrics;
pub use raft_metrics::RaftMetrics;
pub use raft_metrics::RaftServerMetrics;
pub use wait::Wait;
pub use wait::WaitError;
pub(crate) use wait_condition::Condition;
Expand Down
93 changes: 93 additions & 0 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,96 @@ where
}
}
}

/// Subset of RaftMetrics, only include data-related metrics
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct RaftDataMetrics<NID>
where NID: NodeId
{
pub last_log: Option<LogId<NID>>,
pub last_applied: Option<LogId<NID>>,
pub snapshot: Option<LogId<NID>>,
pub purged: Option<LogId<NID>>,
pub replication: Option<ReplicationMetrics<NID>>,
}

impl<NID> fmt::Display for RaftDataMetrics<NID>
where NID: NodeId
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DataMetrics{{")?;

write!(
f,
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}, replication:{{{}}}",
DisplayOption(&self.last_log),
DisplayOption(&self.last_applied),
DisplayOption(&self.snapshot),
DisplayOption(&self.purged),
self.replication
.as_ref()
.map(|x| { x.iter().map(|(k, v)| format!("{}:{}", k, DisplayOption(v))).collect::<Vec<_>>().join(",") })
.unwrap_or_default(),
)?;

write!(f, "}}")?;
Ok(())
}
}

impl<NID> MessageSummary<RaftDataMetrics<NID>> for RaftDataMetrics<NID>
where NID: NodeId
{
fn summary(&self) -> String {
self.to_string()
}
}

/// Subset of RaftMetrics, only include server-related metrics
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct RaftServerMetrics<NID, N>
where
NID: NodeId,
N: Node,
{
pub id: NID,
pub vote: Vote<NID>,
pub state: ServerState,
pub current_leader: Option<NID>,
pub membership_config: Arc<StoredMembership<NID, N>>,
}

impl<NID, N> fmt::Display for RaftServerMetrics<NID, N>
where
NID: NodeId,
N: Node,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "ServerMetrics{{")?;

write!(
f,
"id:{}, {:?}, vote:{}, leader:{}, membership:{}",
self.id,
self.state,
self.vote,
DisplayOption(&self.current_leader),
self.membership_config.summary(),
)?;

write!(f, "}}")?;
Ok(())
}
}

impl<NID, N> MessageSummary<RaftServerMetrics<NID, N>> for RaftServerMetrics<NID, N>
where
NID: NodeId,
N: Node,
{
fn summary(&self) -> String {
self.to_string()
}
}
18 changes: 18 additions & 0 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
use crate::error::RaftError;
use crate::membership::IntoNodes;
use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftMetrics;
use crate::metrics::RaftServerMetrics;
use crate::metrics::Wait;
use crate::metrics::WaitError;
use crate::network::RaftNetworkFactory;
Expand Down Expand Up @@ -174,6 +176,8 @@ where C: RaftTypeConfig
let (tx_api, rx_api) = mpsc::unbounded_channel();
let (tx_notify, rx_notify) = mpsc::unbounded_channel();
let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id));
let (tx_data_metrics, rx_data_metrics) = watch::channel(RaftDataMetrics::default());
let (tx_server_metrics, rx_server_metrics) = watch::channel(RaftServerMetrics::default());
let (tx_shutdown, rx_shutdown) = oneshot::channel();

let tick_handle = Tick::spawn(
Expand Down Expand Up @@ -224,6 +228,8 @@ where C: RaftTypeConfig
rx_notify,

tx_metrics,
tx_data_metrics,
tx_server_metrics,

command_state: CommandState::default(),
span: core_span,
Expand All @@ -240,6 +246,8 @@ where C: RaftTypeConfig
tick_handle,
tx_api,
rx_metrics,
rx_data_metrics,
rx_server_metrics,
tx_shutdown: Mutex::new(Some(tx_shutdown)),
core_state: Mutex::new(CoreState::Running(core_handle)),
};
Expand Down Expand Up @@ -822,6 +830,16 @@ where C: RaftTypeConfig
self.inner.rx_metrics.clone()
}

/// Get a handle to the data metrics channel.
pub fn data_metrics(&self) -> watch::Receiver<RaftDataMetrics<C::NodeId>> {
self.inner.rx_data_metrics.clone()
}

/// Get a handle to the server metrics channel.
pub fn server_metrics(&self) -> watch::Receiver<RaftServerMetrics<C::NodeId, C::Node>> {
self.inner.rx_server_metrics.clone()
}

/// Get a handle to wait for the metrics to satisfy some condition.
///
/// If `timeout` is `None`, then it will wait forever(10 years).
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crate::core::raft_msg::external_command::ExternalCommand;
use crate::core::raft_msg::RaftMsg;
use crate::core::TickHandle;
use crate::error::Fatal;
use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftServerMetrics;
use crate::raft::core_state::CoreState;
use crate::AsyncRuntime;
use crate::Config;
Expand All @@ -28,6 +30,8 @@ where C: RaftTypeConfig
pub(in crate::raft) tick_handle: TickHandle<C>,
pub(in crate::raft) tx_api: mpsc::UnboundedSender<RaftMsg<C>>,
pub(in crate::raft) rx_metrics: watch::Receiver<RaftMetrics<C::NodeId, C::Node>>,
pub(in crate::raft) rx_data_metrics: watch::Receiver<RaftDataMetrics<C::NodeId>>,
pub(in crate::raft) rx_server_metrics: watch::Receiver<RaftServerMetrics<C::NodeId, C::Node>>,

// TODO(xp): it does not need to be a async mutex.
#[allow(clippy::type_complexity)]
Expand Down
1 change: 1 addition & 0 deletions tests/tests/metrics/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod fixtures;

mod t10_current_leader;
mod t10_purged;
mod t10_server_metrics_and_data_metrics;
mod t20_metrics_state_machine_consistency;
mod t30_leader_metrics;
mod t40_metrics_wait;
49 changes: 49 additions & 0 deletions tests/tests/metrics/t10_server_metrics_and_data_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::sync::Arc;

use anyhow::Result;
use maplit::btreeset;
use openraft::Config;
#[allow(unused_imports)] use pretty_assertions::assert_eq;
#[allow(unused_imports)] use pretty_assertions::assert_ne;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Server metrics and data metrics method should work.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn server_metrics_and_data_metrics() -> Result<()> {
// Setup test dependencies.
let config = Arc::new(
Config {
enable_heartbeat: false,
enable_elect: false,
..Default::default()
}
.validate()?,
);
let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let mut log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?;

let node = router.get_raft_handle(&0)?;
let mut server_metrics = node.server_metrics();
let data_metrics = node.data_metrics();

let current_leader = router.current_leader(0).await;
let leader = server_metrics.borrow_and_update().current_leader;
assert_eq!(leader, current_leader, "current_leader should be {:?}", current_leader);

// Write some logs.
let n = 10;
tracing::info!(log_index, "--- write {} logs", n);
log_index += router.client_request_many(0, "foo", n).await?;

let last_log_index = data_metrics.borrow().last_log.unwrap_or_default().index;
assert_eq!(last_log_index, log_index, "last_log_index should be {:?}", log_index);
assert!(
!server_metrics.borrow().has_changed(),
"server metrics should not update"
);
Ok(())
}

0 comments on commit 8261589

Please sign in to comment.