From 1451649d74e3e1a7c0c4be497b95055024c36082 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Thu, 29 Dec 2022 21:01:34 +0530 Subject: [PATCH 1/2] Make BRPC connection timeout configurable. --- src/braft/node.cpp | 6 ++++++ src/braft/remote_file_copier.cpp | 6 +++++- src/braft/replicator.cpp | 2 +- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/braft/node.cpp b/src/braft/node.cpp index dc96daf8..eac4ac4b 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -61,6 +61,10 @@ DEFINE_bool(raft_trace_append_entry_latency, false, "trace append entry latency"); BRPC_VALIDATE_GFLAG(raft_trace_append_entry_latency, brpc::PassValidate); +DEFINE_int32(raft_rpc_channel_connect_timeout_ms, 200, + "Timeout in milliseconds for establishing connections of RPCs"); +BRPC_VALIDATE_GFLAG(raft_rpc_channel_connect_timeout_ms, brpc::PositiveInteger); + DECLARE_bool(raft_enable_leader_lease); #ifndef UNIT_TEST @@ -1611,6 +1615,7 @@ void NodeImpl::pre_vote(std::unique_lock* lck, bool triggered) { brpc::ChannelOptions options; options.connection_type = brpc::CONNECTION_TYPE_SINGLE; options.max_retry = 0; + options.connect_timeout_ms = braft::FLAGS_raft_rpc_channel_connect_timeout_ms; brpc::Channel channel; if (0 != channel.Init(iter->addr, &options)) { LOG(WARNING) << "node " << _group_id << ":" << _server_id @@ -1715,6 +1720,7 @@ void NodeImpl::request_peers_to_vote(const std::set& peers, } brpc::ChannelOptions options; options.connection_type = brpc::CONNECTION_TYPE_SINGLE; + options.connect_timeout_ms = braft::FLAGS_raft_rpc_channel_connect_timeout_ms; options.max_retry = 0; brpc::Channel channel; if (0 != channel.Init(iter->addr, &options)) { diff --git a/src/braft/remote_file_copier.cpp b/src/braft/remote_file_copier.cpp index 684b59bc..04e72290 100644 --- a/src/braft/remote_file_copier.cpp +++ b/src/braft/remote_file_copier.cpp @@ -42,6 +42,8 @@ DEFINE_bool(raft_enable_throttle_when_install_snapshot, true, BRPC_VALIDATE_GFLAG(raft_enable_throttle_when_install_snapshot, ::brpc::PassValidate); +DECLARE_int32(raft_rpc_channel_connect_timeout_ms); + RemoteFileCopier::RemoteFileCopier() : _reader_id(0) , _throttle(NULL) @@ -65,7 +67,9 @@ int RemoteFileCopier::init(const std::string& uri, FileSystemAdaptor* fs, << " in " << uri; return -1; } - if (_channel.Init(ip_and_port.as_string().c_str(), NULL) != 0) { + brpc::ChannelOptions channel_opt; + channel_opt.connect_timeout_ms = braft::FLAGS_raft_rpc_channel_connect_timeout_ms; + if (_channel.Init(ip_and_port.as_string().c_str(), &channel_opt) != 0) { LOG(ERROR) << "Fail to init Channel to " << ip_and_port; return -1; } diff --git a/src/braft/replicator.cpp b/src/braft/replicator.cpp index 2e0d1e17..7330085b 100644 --- a/src/braft/replicator.cpp +++ b/src/braft/replicator.cpp @@ -111,7 +111,7 @@ int Replicator::start(const ReplicatorOptions& options, ReplicatorId *id) { } Replicator* r = new Replicator(); brpc::ChannelOptions channel_opt; - //channel_opt.connect_timeout_ms = *options.heartbeat_timeout_ms; + channel_opt.connect_timeout_ms = -1; channel_opt.timeout_ms = -1; // We don't need RPC timeout if (r->_sending_channel.Init(options.peer_id.addr, &channel_opt) != 0) { LOG(ERROR) << "Fail to init sending channel" From 13bcb3d45a67510426414299f3d83989190c9b0b Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Fri, 30 Dec 2022 17:04:10 +0530 Subject: [PATCH 2/2] Remove unnecessary braft namespace. --- src/braft/node.cpp | 4 ++-- src/braft/remote_file_copier.cpp | 2 +- src/braft/replicator.cpp | 4 +++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/braft/node.cpp b/src/braft/node.cpp index eac4ac4b..f5802f7f 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -1615,7 +1615,7 @@ void NodeImpl::pre_vote(std::unique_lock* lck, bool triggered) { brpc::ChannelOptions options; options.connection_type = brpc::CONNECTION_TYPE_SINGLE; options.max_retry = 0; - options.connect_timeout_ms = braft::FLAGS_raft_rpc_channel_connect_timeout_ms; + options.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms; brpc::Channel channel; if (0 != channel.Init(iter->addr, &options)) { LOG(WARNING) << "node " << _group_id << ":" << _server_id @@ -1720,7 +1720,7 @@ void NodeImpl::request_peers_to_vote(const std::set& peers, } brpc::ChannelOptions options; options.connection_type = brpc::CONNECTION_TYPE_SINGLE; - options.connect_timeout_ms = braft::FLAGS_raft_rpc_channel_connect_timeout_ms; + options.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms; options.max_retry = 0; brpc::Channel channel; if (0 != channel.Init(iter->addr, &options)) { diff --git a/src/braft/remote_file_copier.cpp b/src/braft/remote_file_copier.cpp index 04e72290..534e7bff 100644 --- a/src/braft/remote_file_copier.cpp +++ b/src/braft/remote_file_copier.cpp @@ -68,7 +68,7 @@ int RemoteFileCopier::init(const std::string& uri, FileSystemAdaptor* fs, return -1; } brpc::ChannelOptions channel_opt; - channel_opt.connect_timeout_ms = braft::FLAGS_raft_rpc_channel_connect_timeout_ms; + channel_opt.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms; if (_channel.Init(ip_and_port.as_string().c_str(), &channel_opt) != 0) { LOG(ERROR) << "Fail to init Channel to " << ip_and_port; return -1; diff --git a/src/braft/replicator.cpp b/src/braft/replicator.cpp index 7330085b..d64d8385 100644 --- a/src/braft/replicator.cpp +++ b/src/braft/replicator.cpp @@ -50,6 +50,8 @@ BRPC_VALIDATE_GFLAG(raft_retry_replicate_interval_ms, DECLARE_int64(raft_append_entry_high_lat_us); DECLARE_bool(raft_trace_append_entry_latency); +DECLARE_int32(raft_rpc_channel_connect_timeout_ms); + static bvar::LatencyRecorder g_send_entries_latency("raft_send_entries"); static bvar::LatencyRecorder g_normalized_send_entries_latency( "raft_send_entries_normalized"); @@ -111,7 +113,7 @@ int Replicator::start(const ReplicatorOptions& options, ReplicatorId *id) { } Replicator* r = new Replicator(); brpc::ChannelOptions channel_opt; - channel_opt.connect_timeout_ms = -1; + channel_opt.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms; channel_opt.timeout_ms = -1; // We don't need RPC timeout if (r->_sending_channel.Init(options.peer_id.addr, &channel_opt) != 0) { LOG(ERROR) << "Fail to init sending channel"