Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support grpc message observer #411

Open
wants to merge 2 commits into
base: raftstore-proxy
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ use tikv::{
lock_manager::LockManager,
raftkv::ReplicaReadLockChecker,
resolve,
service::{DebugService, DiagnosticsService},
service::{DebugService, DefaultGrpcMessageObserver, DiagnosticsService},
status_server::StatusServer,
tablet_snap::NoSnapshotCache,
ttl::TtlChecker,
Expand Down Expand Up @@ -864,6 +864,7 @@ where
debug_thread_pool,
health_controller,
self.resource_manager.clone(),
Arc::new(DefaultGrpcMessageObserver::default()),
)
.unwrap_or_else(|e| fatal!("failed to create server: {}", e));
cfg_controller.register(
Expand Down
3 changes: 2 additions & 1 deletion components/server/src/server2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ use tikv::{
lock_manager::LockManager,
raftkv::ReplicaReadLockChecker,
resolve,
service::{DebugService, DiagnosticsService},
service::{DebugService, DefaultGrpcMessageObserver, DiagnosticsService},
status_server::StatusServer,
KvEngineFactoryBuilder, NodeV2, RaftKv2, Server, CPU_CORES_QUOTA_GAUGE, GRPC_THREAD_PREFIX,
MEMORY_LIMIT_GAUGE,
Expand Down Expand Up @@ -814,6 +814,7 @@ where
debug_thread_pool,
health_controller,
self.resource_manager.clone(),
Arc::new(DefaultGrpcMessageObserver::default()),
)
.unwrap_or_else(|e| fatal!("failed to create server: {}", e));
cfg_controller.register(
Expand Down
3 changes: 2 additions & 1 deletion components/test_raftstore-v2/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use tikv::{
lock_manager::LockManager,
raftkv::ReplicaReadLockChecker,
resolve,
service::{DebugService, DiagnosticsService},
service::{DebugService, DefaultGrpcMessageObserver, DiagnosticsService},
ConnectionBuilder, Error, Extension, NodeV2, PdStoreAddrResolver, RaftClient, RaftKv2,
Result as ServerResult, Server, ServerTransport,
},
Expand Down Expand Up @@ -644,6 +644,7 @@ impl<EK: KvEngine> ServerCluster<EK> {
debug_thread_pool.clone(),
health_controller.clone(),
resource_manager.clone(),
Arc::new(DefaultGrpcMessageObserver::default()),
)
.unwrap();
svr.register_service(create_diagnostics(diag_service.clone()));
Expand Down
3 changes: 2 additions & 1 deletion components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use tikv::{
lock_manager::LockManager,
raftkv::ReplicaReadLockChecker,
resolve::{self, StoreAddrResolver},
service::DebugService,
service::{DebugService, DefaultGrpcMessageObserver},
tablet_snap::NoSnapshotCache,
ConnectionBuilder, Error, MultiRaftServer, PdStoreAddrResolver, RaftClient, RaftKv,
Result as ServerResult, Server, ServerTransport,
Expand Down Expand Up @@ -616,6 +616,7 @@ impl ServerCluster {
debug_thread_pool.clone(),
health_controller.clone(),
resource_manager.clone(),
Arc::new(DefaultGrpcMessageObserver::default()),
)
.unwrap();
svr.register_service(create_import_sst(import_service.clone()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use tikv::{
lock_manager::LockManager,
raftkv::ReplicaReadLockChecker,
resolve::{self, StoreAddrResolver},
service::DefaultGrpcMessageObserver,
tablet_snap::NoSnapshotCache,
ConnectionBuilder, Error, MultiRaftServer, PdStoreAddrResolver, RaftClient, RaftKv,
Result as ServerResult, Server, ServerTransport,
Expand Down Expand Up @@ -546,6 +547,7 @@ impl ServerCluster {
debug_thread_pool.clone(),
health_controller.clone(),
None,
Arc::new(DefaultGrpcMessageObserver::default()),
)
.unwrap();
svr.register_service(create_import_sst(import_service.clone()));
Expand Down
43 changes: 41 additions & 2 deletions proxy_components/proxy_server/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ use tikv::{
gc_worker::GcWorker,
raftkv::ReplicaReadLockChecker,
resolve,
service::{DebugService, DiagnosticsService},
service::{DebugService, DiagnosticsService, RaftGrpcMessageObserver},
tablet_snap::NoSnapshotCache,
ttl::TtlChecker,
KvEngineFactoryBuilder, MultiRaftServer, RaftKv, Server, CPU_CORES_QUOTA_GAUGE,
Expand All @@ -106,7 +106,10 @@ use tikv_util::{
config::{ensure_dir_exist, ReadableDuration, VersionTrack},
error,
quota_limiter::{QuotaLimitConfigManager, QuotaLimiter},
sys::{disk, register_memory_usage_high_water, thread::ThreadBuildWrapper, SysQuota},
sys::{
disk, memory_usage_reaches_high_water, register_memory_usage_high_water,
thread::ThreadBuildWrapper, SysQuota,
},
thread_group::GroupProperties,
time::{Instant, Monitor},
worker::{Builder as WorkerBuilder, LazyWorker, Scheduler},
Expand Down Expand Up @@ -572,6 +575,39 @@ impl<CER: ConfiguredRaftEngine, F: KvFormat> TiKvServer<CER, F> {
}
}

#[derive(Clone)]
pub struct TiFlashGrpcMessageObserver {
reject_messages_on_memory_ratio: f64,
}

impl TiFlashGrpcMessageObserver {
pub fn new(reject_messages_on_memory_ratio: f64) -> Self {
Self {
reject_messages_on_memory_ratio,
}
}
}

impl RaftGrpcMessageObserver for TiFlashGrpcMessageObserver {
fn should_reject_append(&self) -> Option<bool> {
if self.reject_messages_on_memory_ratio < f64::EPSILON {
return Some(false);
}

let mut usage = 0;
Some(memory_usage_reaches_high_water(&mut usage))
}

fn should_reject_snapshot(&self) -> Option<bool> {
if self.reject_messages_on_memory_ratio < f64::EPSILON {
return Some(false);
}

let mut usage = 0;
Some(memory_usage_reaches_high_water(&mut usage))
}
}

const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000);
const DEFAULT_MEMTRACE_FLUSH_INTERVAL: Duration = Duration::from_millis(1_000);
const DEFAULT_STORAGE_STATS_INTERVAL: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -1275,6 +1311,9 @@ impl<ER: RaftEngine, F: KvFormat> TiKvServer<ER, F> {
debug_thread_pool,
health_controller,
self.resource_manager.clone(),
Arc::new(TiFlashGrpcMessageObserver::new(
self.core.config.server.reject_messages_on_memory_ratio,
)),
)
.unwrap_or_else(|e| fatal!("failed to create server: {}", e));

Expand Down
5 changes: 5 additions & 0 deletions src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,11 @@ lazy_static! {
"Count for rejected Raft append messages"
)
.unwrap();
pub static ref RAFT_SNAPSHOT_REJECTS: IntCounter = register_int_counter!(
"tikv_server_raft_snapshot_rejects",
"Count for rejected Raft snapshot messages"
)
.unwrap();
pub static ref SNAP_LIMIT_TRANSPORT_BYTES_COUNTER: IntCounterVec = register_int_counter_vec!(
"tikv_snapshot_limit_transport_bytes",
"Total snapshot limit transport used",
Expand Down
2 changes: 2 additions & 0 deletions src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ where
debug_thread_pool: Arc<Runtime>,
health_controller: HealthController,
resource_manager: Option<Arc<ResourceGroupManager>>,
raft_message_observer: Arc<dyn RaftGrpcMessageObserver + Send + Sync>,
) -> Result<Self> {
// A helper thread (or pool) for transport layer.
let stats_pool = if cfg.value().stats_concurrency > 0 {
Expand Down Expand Up @@ -211,6 +212,7 @@ where
resource_manager,
health_controller.clone(),
health_feedback_interval,
raft_message_observer,
);
let builder_factory = Box::new(BuilderFactory::new(
kv_service,
Expand Down
60 changes: 55 additions & 5 deletions src/server/service/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,26 @@ use crate::{
const GRPC_MSG_MAX_BATCH_SIZE: usize = 128;
const GRPC_MSG_NOTIFY_SIZE: usize = 8;

pub trait RaftGrpcMessageObserver {
fn should_reject_append(&self) -> Option<bool>;
fn should_reject_snapshot(&self) -> Option<bool>;
}

#[derive(Clone, Default)]
pub struct DefaultGrpcMessageObserver {}

impl RaftGrpcMessageObserver for DefaultGrpcMessageObserver {
fn should_reject_append(&self) -> Option<bool> {
fail::fail_point!("force_reject_raft_append_message", |_| Some(true));
None
}

fn should_reject_snapshot(&self) -> Option<bool> {
fail::fail_point!("force_reject_raft_snapshot_message", |_| Some(true));
None
}
}

/// Service handles the RPC messages for the `Tikv` service.
pub struct Service<E: Engine, L: LockManager, F: KvFormat> {
cluster_id: u64,
Expand Down Expand Up @@ -103,6 +123,8 @@ pub struct Service<E: Engine, L: LockManager, F: KvFormat> {
health_controller: HealthController,
health_feedback_interval: Option<Duration>,
health_feedback_seq: Arc<AtomicU64>,

raft_message_observer: Arc<dyn RaftGrpcMessageObserver + Send + Sync>,
}

impl<E: Engine, L: LockManager, F: KvFormat> Drop for Service<E, L, F> {
Expand Down Expand Up @@ -130,6 +152,7 @@ impl<E: Engine + Clone, L: LockManager + Clone, F: KvFormat> Clone for Service<E
health_controller: self.health_controller.clone(),
health_feedback_seq: self.health_feedback_seq.clone(),
health_feedback_interval: self.health_feedback_interval,
raft_message_observer: self.raft_message_observer.clone(),
}
}
}
Expand All @@ -152,6 +175,7 @@ impl<E: Engine, L: LockManager, F: KvFormat> Service<E, L, F> {
resource_manager: Option<Arc<ResourceGroupManager>>,
health_controller: HealthController,
health_feedback_interval: Option<Duration>,
raft_message_observer: Arc<dyn RaftGrpcMessageObserver + Send + Sync>,
) -> Self {
let now_unix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
Expand All @@ -174,6 +198,7 @@ impl<E: Engine, L: LockManager, F: KvFormat> Service<E, L, F> {
health_controller,
health_feedback_interval,
health_feedback_seq: Arc::new(AtomicU64::new(now_unix)),
raft_message_observer,
}
}

Expand All @@ -182,6 +207,7 @@ impl<E: Engine, L: LockManager, F: KvFormat> Service<E, L, F> {
ch: &E::RaftExtension,
msg: RaftMessage,
reject: bool,
reject_snap: bool,
) -> RaftStoreResult<()> {
let to_store_id = msg.get_to_peer().get_store_id();
if to_store_id != store_id {
Expand All @@ -190,7 +216,9 @@ impl<E: Engine, L: LockManager, F: KvFormat> Service<E, L, F> {
my_store_id: store_id,
});
}
if reject && msg.get_message().get_msg_type() == MessageType::MsgAppend {
if (reject && msg.get_message().get_msg_type() == MessageType::MsgAppend)
|| (reject_snap && msg.get_message().get_msg_type() == MessageType::MsgSnapshot)
{
RAFT_APPEND_REJECTS.inc();
let id = msg.get_region_id();
let peer_id = msg.get_message().get_from();
Expand Down Expand Up @@ -754,14 +782,23 @@ impl<E: Engine, L: LockManager, F: KvFormat> Tikv for Service<E, L, F> {
let store_id = self.store_id;
let ch = self.storage.get_engine().raft_extension();
let reject_messages_on_memory_ratio = self.reject_messages_on_memory_ratio;
let ob = self.raft_message_observer.clone();

let res = async move {
let mut stream = stream.map_err(Error::from);
while let Some(msg) = stream.try_next().await? {
RAFT_MESSAGE_RECV_COUNTER.inc();
let reject = needs_reject_raft_append(reject_messages_on_memory_ratio);
let reject = match ob.should_reject_append() {
Some(b) => b,
None => needs_reject_raft_append(reject_messages_on_memory_ratio),
};
let reject_snap = if let Some(b) = ob.should_reject_snapshot() {
b
} else {
false
};
if let Err(err @ RaftStoreError::StoreNotMatch { .. }) =
Self::handle_raft_message(store_id, &ch, msg, reject)
Self::handle_raft_message(store_id, &ch, msg, reject, reject_snap)
{
// Return an error here will break the connection, only do that for
// `StoreNotMatch` to let tikv to resolve a correct address from PD
Expand Down Expand Up @@ -808,16 +845,25 @@ impl<E: Engine, L: LockManager, F: KvFormat> Tikv for Service<E, L, F> {
let ch = self.storage.get_engine().raft_extension();
let reject_messages_on_memory_ratio = self.reject_messages_on_memory_ratio;

let ob = self.raft_message_observer.clone();
let res = async move {
let mut stream = stream.map_err(Error::from);
while let Some(mut batch_msg) = stream.try_next().await? {
let len = batch_msg.get_msgs().len();
RAFT_MESSAGE_RECV_COUNTER.inc_by(len as u64);
RAFT_MESSAGE_BATCH_SIZE.observe(len as f64);
let reject = needs_reject_raft_append(reject_messages_on_memory_ratio);
let reject = match ob.should_reject_append() {
Some(b) => b,
None => needs_reject_raft_append(reject_messages_on_memory_ratio),
};
let reject_snap = if let Some(b) = ob.should_reject_snapshot() {
b
} else {
false
};
for msg in batch_msg.take_msgs().into_iter() {
if let Err(err @ RaftStoreError::StoreNotMatch { .. }) =
Self::handle_raft_message(store_id, &ch, msg, reject)
Self::handle_raft_message(store_id, &ch, msg, reject, reject_snap)
{
// Return an error here will break the connection, only do that for
// `StoreNotMatch` to let tikv to resolve a correct address from PD
Expand Down Expand Up @@ -854,6 +900,10 @@ impl<E: Engine, L: LockManager, F: KvFormat> Tikv for Service<E, L, F> {
stream: RequestStream<SnapshotChunk>,
sink: ClientStreamingSink<Done>,
) {
if let Some(true) = self.raft_message_observer.should_reject_snapshot() {
RAFT_SNAPSHOT_REJECTS.inc();
return;
};
let task = SnapTask::Recv { stream, sink };
if let Err(e) = self.snap_scheduler.schedule(task) {
let err_msg = format!("{}", e);
Expand Down
5 changes: 3 additions & 2 deletions src/server/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ pub use self::{
diagnostics::Service as DiagnosticsService,
kv::{
batch_commands_request, batch_commands_response, future_flashback_to_version,
future_prepare_flashback_to_version, GrpcRequestDuration, MeasuredBatchResponse,
MeasuredSingleResponse, Service as KvService,
future_prepare_flashback_to_version, DefaultGrpcMessageObserver, GrpcRequestDuration,
MeasuredBatchResponse, MeasuredSingleResponse, RaftGrpcMessageObserver,
Service as KvService,
},
};

Expand Down
46 changes: 46 additions & 0 deletions tests/failpoints/cases/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,49 @@ fn test_serving_status() {
thread::sleep(Duration::from_millis(200));
assert_eq!(check(), ServingStatus::Serving);
}

#[test]
fn test_raft_message_observer() {
use raft::eraftpb::ConfChangeType;

test_util::init_log_for_test();
let mut cluster = new_server_cluster(0, 3);
cluster.pd_client.disable_default_operator();
let r1 = cluster.run_conf_change();

cluster.must_put(b"k1", b"v1");

fail::cfg("force_reject_raft_append_message", "return").unwrap();
fail::cfg("force_reject_raft_snapshot_message", "return").unwrap();

cluster.pd_client.add_peer(r1, new_peer(2, 2));

std::thread::sleep(std::time::Duration::from_millis(500));

must_get_none(&cluster.get_engine(2), b"k1");

fail::remove("force_reject_raft_append_message");
fail::remove("force_reject_raft_snapshot_message");

cluster.pd_client.must_have_peer(r1, new_peer(2, 2));
cluster.pd_client.must_add_peer(r1, new_peer(3, 3));

must_get_equal(&cluster.get_engine(2), b"k1", b"v1");
must_get_equal(&cluster.get_engine(3), b"k1", b"v1");

fail::cfg("force_reject_raft_append_message", "return").unwrap();

let _ = cluster.async_put(b"k3", b"v3").unwrap();

std::thread::sleep(std::time::Duration::from_millis(500));

must_get_none(&cluster.get_engine(2), b"k3");
must_get_none(&cluster.get_engine(3), b"k3");

fail::remove("force_reject_raft_append_message");

cluster.must_put(b"k4", b"v4");
for id in 1..=3 {
must_get_equal(&cluster.get_engine(id), b"k4", b"v4");
}
}
Loading