From 648c3e701a7bf4443960b934b4a63de89d3149df Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Thu, 9 Jan 2025 22:57:38 +0800 Subject: [PATCH 1/2] initial Signed-off-by: Calvin Neo --- components/server/src/server.rs | 3 +- components/server/src/server2.rs | 3 +- components/test_raftstore-v2/src/server.rs | 3 +- components/test_raftstore/src/server.rs | 3 +- .../src/mock_cluster/v1/server.rs | 2 + proxy_components/proxy_server/src/run.rs | 56 ++++++++++++++++++- src/server/metrics.rs | 5 ++ src/server/server.rs | 2 + src/server/service/kv.rs | 42 +++++++++++++- src/server/service/mod.rs | 5 +- 10 files changed, 114 insertions(+), 10 deletions(-) diff --git a/components/server/src/server.rs b/components/server/src/server.rs index a083ce5769a..70e8558ba89 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -101,7 +101,7 @@ use tikv::{ lock_manager::LockManager, raftkv::ReplicaReadLockChecker, resolve, - service::{DebugService, DiagnosticsService}, + service::{DebugService, DefaultGrpcMessageObserver, DiagnosticsService}, status_server::StatusServer, tablet_snap::NoSnapshotCache, ttl::TtlChecker, @@ -864,6 +864,7 @@ where debug_thread_pool, health_controller, self.resource_manager.clone(), + Arc::new(DefaultGrpcMessageObserver::default()), ) .unwrap_or_else(|e| fatal!("failed to create server: {}", e)); cfg_controller.register( diff --git a/components/server/src/server2.rs b/components/server/src/server2.rs index 74a9ffaffa9..8dd198e57b0 100644 --- a/components/server/src/server2.rs +++ b/components/server/src/server2.rs @@ -90,7 +90,7 @@ use tikv::{ lock_manager::LockManager, raftkv::ReplicaReadLockChecker, resolve, - service::{DebugService, DiagnosticsService}, + service::{DebugService, DefaultGrpcMessageObserver, DiagnosticsService}, status_server::StatusServer, KvEngineFactoryBuilder, NodeV2, RaftKv2, Server, CPU_CORES_QUOTA_GAUGE, GRPC_THREAD_PREFIX, MEMORY_LIMIT_GAUGE, @@ -814,6 +814,7 @@ where debug_thread_pool, health_controller, self.resource_manager.clone(), + Arc::new(DefaultGrpcMessageObserver::default()), ) .unwrap_or_else(|e| fatal!("failed to create server: {}", e)); cfg_controller.register( diff --git a/components/test_raftstore-v2/src/server.rs b/components/test_raftstore-v2/src/server.rs index 47830c77730..5998526fc31 100644 --- a/components/test_raftstore-v2/src/server.rs +++ b/components/test_raftstore-v2/src/server.rs @@ -61,7 +61,7 @@ use tikv::{ lock_manager::LockManager, raftkv::ReplicaReadLockChecker, resolve, - service::{DebugService, DiagnosticsService}, + service::{DebugService, DefaultGrpcMessageObserver, DiagnosticsService}, ConnectionBuilder, Error, Extension, NodeV2, PdStoreAddrResolver, RaftClient, RaftKv2, Result as ServerResult, Server, ServerTransport, }, @@ -644,6 +644,7 @@ impl ServerCluster { debug_thread_pool.clone(), health_controller.clone(), resource_manager.clone(), + Arc::new(DefaultGrpcMessageObserver::default()), ) .unwrap(); svr.register_service(create_diagnostics(diag_service.clone())); diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index d73157c51ac..e9499d44b32 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -66,7 +66,7 @@ use tikv::{ lock_manager::LockManager, raftkv::ReplicaReadLockChecker, resolve::{self, StoreAddrResolver}, - service::DebugService, + service::{DebugService, DefaultGrpcMessageObserver}, tablet_snap::NoSnapshotCache, ConnectionBuilder, Error, MultiRaftServer, PdStoreAddrResolver, RaftClient, RaftKv, Result as ServerResult, Server, ServerTransport, @@ -616,6 +616,7 @@ impl ServerCluster { debug_thread_pool.clone(), health_controller.clone(), resource_manager.clone(), + Arc::new(DefaultGrpcMessageObserver::default()), ) .unwrap(); svr.register_service(create_import_sst(import_service.clone())); diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs index b517db3f4f3..ad2c95cda87 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs @@ -58,6 +58,7 @@ use tikv::{ lock_manager::LockManager, raftkv::ReplicaReadLockChecker, resolve::{self, StoreAddrResolver}, + service::DefaultGrpcMessageObserver, tablet_snap::NoSnapshotCache, ConnectionBuilder, Error, MultiRaftServer, PdStoreAddrResolver, RaftClient, RaftKv, Result as ServerResult, Server, ServerTransport, @@ -546,6 +547,7 @@ impl ServerCluster { debug_thread_pool.clone(), health_controller.clone(), None, + Arc::new(DefaultGrpcMessageObserver::default()), ) .unwrap(); svr.register_service(create_import_sst(import_service.clone())); diff --git a/proxy_components/proxy_server/src/run.rs b/proxy_components/proxy_server/src/run.rs index c8f127e937b..1b9bc9c733f 100644 --- a/proxy_components/proxy_server/src/run.rs +++ b/proxy_components/proxy_server/src/run.rs @@ -84,7 +84,7 @@ use tikv::{ gc_worker::GcWorker, raftkv::ReplicaReadLockChecker, resolve, - service::{DebugService, DiagnosticsService}, + service::{DebugService, DiagnosticsService, RaftGrpcMessageObserver}, tablet_snap::NoSnapshotCache, ttl::TtlChecker, KvEngineFactoryBuilder, MultiRaftServer, RaftKv, Server, CPU_CORES_QUOTA_GAUGE, @@ -106,7 +106,10 @@ use tikv_util::{ config::{ensure_dir_exist, ReadableDuration, VersionTrack}, error, quota_limiter::{QuotaLimitConfigManager, QuotaLimiter}, - sys::{disk, register_memory_usage_high_water, thread::ThreadBuildWrapper, SysQuota}, + sys::{ + disk, memory_usage_reaches_high_water, register_memory_usage_high_water, + thread::ThreadBuildWrapper, SysQuota, + }, thread_group::GroupProperties, time::{Instant, Monitor}, worker::{Builder as WorkerBuilder, LazyWorker, Scheduler}, @@ -572,6 +575,52 @@ impl TiKvServer { } } +#[derive(Clone)] +pub struct TiFlashGrpcMessageObserver { + reject_messages_on_memory_ratio: f64, +} + +impl TiFlashGrpcMessageObserver { + pub fn new(reject_messages_on_memory_ratio: f64) -> Self { + Self { + reject_messages_on_memory_ratio, + } + } +} + +impl RaftGrpcMessageObserver for TiFlashGrpcMessageObserver { + fn should_reject_append(&self) -> Option { + if self.reject_messages_on_memory_ratio < f64::EPSILON { + return Some(false); + } + + let mut usage = 0; + Some(memory_usage_reaches_high_water(&mut usage)) + } + + fn should_reject_snapshot(&self) -> Option { + info!( + "!!!!!!! should_reject_snapshot {}", + self.reject_messages_on_memory_ratio + ); + + if self.reject_messages_on_memory_ratio < f64::EPSILON { + return Some(false); + } + + let mut usage = 0; + + let x = memory_usage_reaches_high_water(&mut usage); + + info!( + "!!!!!!! should_reject_snapshot memory_usage_reaches_high_water {}", + x + ); + + Some(x) + } +} + const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000); const DEFAULT_MEMTRACE_FLUSH_INTERVAL: Duration = Duration::from_millis(1_000); const DEFAULT_STORAGE_STATS_INTERVAL: Duration = Duration::from_secs(1); @@ -1275,6 +1324,9 @@ impl TiKvServer { debug_thread_pool, health_controller, self.resource_manager.clone(), + Arc::new(TiFlashGrpcMessageObserver::new( + self.core.config.server.reject_messages_on_memory_ratio, + )), ) .unwrap_or_else(|e| fatal!("failed to create server: {}", e)); diff --git a/src/server/metrics.rs b/src/server/metrics.rs index ae06155f4a1..6b4d6328ace 100644 --- a/src/server/metrics.rs +++ b/src/server/metrics.rs @@ -462,6 +462,11 @@ lazy_static! { "Count for rejected Raft append messages" ) .unwrap(); + pub static ref RAFT_SNAPSHOT_REJECTS: IntCounter = register_int_counter!( + "tikv_server_raft_snapshot_rejects", + "Count for rejected Raft snapshot messages" + ) + .unwrap(); pub static ref SNAP_LIMIT_TRANSPORT_BYTES_COUNTER: IntCounterVec = register_int_counter_vec!( "tikv_snapshot_limit_transport_bytes", "Total snapshot limit transport used", diff --git a/src/server/server.rs b/src/server/server.rs index e96fba10afd..d4c10e87d52 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -166,6 +166,7 @@ where debug_thread_pool: Arc, health_controller: HealthController, resource_manager: Option>, + raft_message_observer: Arc, ) -> Result { // A helper thread (or pool) for transport layer. let stats_pool = if cfg.value().stats_concurrency > 0 { @@ -211,6 +212,7 @@ where resource_manager, health_controller.clone(), health_feedback_interval, + raft_message_observer, ); let builder_factory = Box::new(BuilderFactory::new( kv_service, diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index be895af869a..272e69ad46f 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -71,6 +71,24 @@ use crate::{ const GRPC_MSG_MAX_BATCH_SIZE: usize = 128; const GRPC_MSG_NOTIFY_SIZE: usize = 8; +pub trait RaftGrpcMessageObserver { + fn should_reject_append(&self) -> Option; + fn should_reject_snapshot(&self) -> Option; +} + +#[derive(Clone, Default)] +pub struct DefaultGrpcMessageObserver {} + +impl RaftGrpcMessageObserver for DefaultGrpcMessageObserver { + fn should_reject_append(&self) -> Option { + None + } + + fn should_reject_snapshot(&self) -> Option { + None + } +} + /// Service handles the RPC messages for the `Tikv` service. pub struct Service { cluster_id: u64, @@ -103,6 +121,8 @@ pub struct Service { health_controller: HealthController, health_feedback_interval: Option, health_feedback_seq: Arc, + + raft_message_observer: Arc, } impl Drop for Service { @@ -130,6 +150,7 @@ impl Clone for Service Service { resource_manager: Option>, health_controller: HealthController, health_feedback_interval: Option, + raft_message_observer: Arc, ) -> Self { let now_unix = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -174,6 +196,7 @@ impl Service { health_controller, health_feedback_interval, health_feedback_seq: Arc::new(AtomicU64::new(now_unix)), + raft_message_observer, } } @@ -754,12 +777,16 @@ impl Tikv for Service { let store_id = self.store_id; let ch = self.storage.get_engine().raft_extension(); let reject_messages_on_memory_ratio = self.reject_messages_on_memory_ratio; + let ob = self.raft_message_observer.clone(); let res = async move { let mut stream = stream.map_err(Error::from); while let Some(msg) = stream.try_next().await? { RAFT_MESSAGE_RECV_COUNTER.inc(); - let reject = needs_reject_raft_append(reject_messages_on_memory_ratio); + let reject = match ob.should_reject_append() { + Some(b) => b, + None => needs_reject_raft_append(reject_messages_on_memory_ratio), + }; if let Err(err @ RaftStoreError::StoreNotMatch { .. }) = Self::handle_raft_message(store_id, &ch, msg, reject) { @@ -808,13 +835,17 @@ impl Tikv for Service { let ch = self.storage.get_engine().raft_extension(); let reject_messages_on_memory_ratio = self.reject_messages_on_memory_ratio; + let ob = self.raft_message_observer.clone(); let res = async move { let mut stream = stream.map_err(Error::from); while let Some(mut batch_msg) = stream.try_next().await? { let len = batch_msg.get_msgs().len(); RAFT_MESSAGE_RECV_COUNTER.inc_by(len as u64); RAFT_MESSAGE_BATCH_SIZE.observe(len as f64); - let reject = needs_reject_raft_append(reject_messages_on_memory_ratio); + let reject = match ob.should_reject_append() { + Some(b) => b, + None => needs_reject_raft_append(reject_messages_on_memory_ratio), + }; for msg in batch_msg.take_msgs().into_iter() { if let Err(err @ RaftStoreError::StoreNotMatch { .. }) = Self::handle_raft_message(store_id, &ch, msg, reject) @@ -854,6 +885,13 @@ impl Tikv for Service { stream: RequestStream, sink: ClientStreamingSink, ) { + if let Some(rej) = self.raft_message_observer.should_reject_snapshot() { + if rej { + info!("!!!!!!! should_reject_snapshot 2"); + RAFT_SNAPSHOT_REJECTS.inc(); + return; + } + }; let task = SnapTask::Recv { stream, sink }; if let Err(e) = self.snap_scheduler.schedule(task) { let err_msg = format!("{}", e); diff --git a/src/server/service/mod.rs b/src/server/service/mod.rs index 00369a4ceae..a086c990b92 100644 --- a/src/server/service/mod.rs +++ b/src/server/service/mod.rs @@ -10,8 +10,9 @@ pub use self::{ diagnostics::Service as DiagnosticsService, kv::{ batch_commands_request, batch_commands_response, future_flashback_to_version, - future_prepare_flashback_to_version, GrpcRequestDuration, MeasuredBatchResponse, - MeasuredSingleResponse, Service as KvService, + future_prepare_flashback_to_version, DefaultGrpcMessageObserver, GrpcRequestDuration, + MeasuredBatchResponse, MeasuredSingleResponse, RaftGrpcMessageObserver, + Service as KvService, }, }; From 0e7c3498411b1245c5d0b2feeae15923fd4c1cf5 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Fri, 10 Jan 2025 12:07:11 +0800 Subject: [PATCH 2/2] add tests Signed-off-by: Calvin Neo --- proxy_components/proxy_server/src/run.rs | 15 +------- src/server/service/kv.rs | 30 +++++++++++----- tests/failpoints/cases/test_server.rs | 46 ++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 23 deletions(-) diff --git a/proxy_components/proxy_server/src/run.rs b/proxy_components/proxy_server/src/run.rs index 1b9bc9c733f..59192399f1f 100644 --- a/proxy_components/proxy_server/src/run.rs +++ b/proxy_components/proxy_server/src/run.rs @@ -599,25 +599,12 @@ impl RaftGrpcMessageObserver for TiFlashGrpcMessageObserver { } fn should_reject_snapshot(&self) -> Option { - info!( - "!!!!!!! should_reject_snapshot {}", - self.reject_messages_on_memory_ratio - ); - if self.reject_messages_on_memory_ratio < f64::EPSILON { return Some(false); } let mut usage = 0; - - let x = memory_usage_reaches_high_water(&mut usage); - - info!( - "!!!!!!! should_reject_snapshot memory_usage_reaches_high_water {}", - x - ); - - Some(x) + Some(memory_usage_reaches_high_water(&mut usage)) } } diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index 272e69ad46f..f1aab2a95ab 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -81,10 +81,12 @@ pub struct DefaultGrpcMessageObserver {} impl RaftGrpcMessageObserver for DefaultGrpcMessageObserver { fn should_reject_append(&self) -> Option { + fail::fail_point!("force_reject_raft_append_message", |_| Some(true)); None } fn should_reject_snapshot(&self) -> Option { + fail::fail_point!("force_reject_raft_snapshot_message", |_| Some(true)); None } } @@ -205,6 +207,7 @@ impl Service { ch: &E::RaftExtension, msg: RaftMessage, reject: bool, + reject_snap: bool, ) -> RaftStoreResult<()> { let to_store_id = msg.get_to_peer().get_store_id(); if to_store_id != store_id { @@ -213,7 +216,9 @@ impl Service { my_store_id: store_id, }); } - if reject && msg.get_message().get_msg_type() == MessageType::MsgAppend { + if (reject && msg.get_message().get_msg_type() == MessageType::MsgAppend) + || (reject_snap && msg.get_message().get_msg_type() == MessageType::MsgSnapshot) + { RAFT_APPEND_REJECTS.inc(); let id = msg.get_region_id(); let peer_id = msg.get_message().get_from(); @@ -787,8 +792,13 @@ impl Tikv for Service { Some(b) => b, None => needs_reject_raft_append(reject_messages_on_memory_ratio), }; + let reject_snap = if let Some(b) = ob.should_reject_snapshot() { + b + } else { + false + }; if let Err(err @ RaftStoreError::StoreNotMatch { .. }) = - Self::handle_raft_message(store_id, &ch, msg, reject) + Self::handle_raft_message(store_id, &ch, msg, reject, reject_snap) { // Return an error here will break the connection, only do that for // `StoreNotMatch` to let tikv to resolve a correct address from PD @@ -846,9 +856,14 @@ impl Tikv for Service { Some(b) => b, None => needs_reject_raft_append(reject_messages_on_memory_ratio), }; + let reject_snap = if let Some(b) = ob.should_reject_snapshot() { + b + } else { + false + }; for msg in batch_msg.take_msgs().into_iter() { if let Err(err @ RaftStoreError::StoreNotMatch { .. }) = - Self::handle_raft_message(store_id, &ch, msg, reject) + Self::handle_raft_message(store_id, &ch, msg, reject, reject_snap) { // Return an error here will break the connection, only do that for // `StoreNotMatch` to let tikv to resolve a correct address from PD @@ -885,12 +900,9 @@ impl Tikv for Service { stream: RequestStream, sink: ClientStreamingSink, ) { - if let Some(rej) = self.raft_message_observer.should_reject_snapshot() { - if rej { - info!("!!!!!!! should_reject_snapshot 2"); - RAFT_SNAPSHOT_REJECTS.inc(); - return; - } + if let Some(true) = self.raft_message_observer.should_reject_snapshot() { + RAFT_SNAPSHOT_REJECTS.inc(); + return; }; let task = SnapTask::Recv { stream, sink }; if let Err(e) = self.snap_scheduler.schedule(task) { diff --git a/tests/failpoints/cases/test_server.rs b/tests/failpoints/cases/test_server.rs index dfbb883179c..7db7d2ad6d4 100644 --- a/tests/failpoints/cases/test_server.rs +++ b/tests/failpoints/cases/test_server.rs @@ -156,3 +156,49 @@ fn test_serving_status() { thread::sleep(Duration::from_millis(200)); assert_eq!(check(), ServingStatus::Serving); } + +#[test] +fn test_raft_message_observer() { + use raft::eraftpb::ConfChangeType; + + test_util::init_log_for_test(); + let mut cluster = new_server_cluster(0, 3); + cluster.pd_client.disable_default_operator(); + let r1 = cluster.run_conf_change(); + + cluster.must_put(b"k1", b"v1"); + + fail::cfg("force_reject_raft_append_message", "return").unwrap(); + fail::cfg("force_reject_raft_snapshot_message", "return").unwrap(); + + cluster.pd_client.add_peer(r1, new_peer(2, 2)); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + must_get_none(&cluster.get_engine(2), b"k1"); + + fail::remove("force_reject_raft_append_message"); + fail::remove("force_reject_raft_snapshot_message"); + + cluster.pd_client.must_have_peer(r1, new_peer(2, 2)); + cluster.pd_client.must_add_peer(r1, new_peer(3, 3)); + + must_get_equal(&cluster.get_engine(2), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + fail::cfg("force_reject_raft_append_message", "return").unwrap(); + + let _ = cluster.async_put(b"k3", b"v3").unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + must_get_none(&cluster.get_engine(2), b"k3"); + must_get_none(&cluster.get_engine(3), b"k3"); + + fail::remove("force_reject_raft_append_message"); + + cluster.must_put(b"k4", b"v4"); + for id in 1..=3 { + must_get_equal(&cluster.get_engine(id), b"k4", b"v4"); + } +}