diff --git a/src/braft/node.cpp b/src/braft/node.cpp index dc96daf8..f5802f7f 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -61,6 +61,10 @@ DEFINE_bool(raft_trace_append_entry_latency, false, "trace append entry latency"); BRPC_VALIDATE_GFLAG(raft_trace_append_entry_latency, brpc::PassValidate); +DEFINE_int32(raft_rpc_channel_connect_timeout_ms, 200, + "Timeout in milliseconds for establishing connections of RPCs"); +BRPC_VALIDATE_GFLAG(raft_rpc_channel_connect_timeout_ms, brpc::PositiveInteger); + DECLARE_bool(raft_enable_leader_lease); #ifndef UNIT_TEST @@ -1611,6 +1615,7 @@ void NodeImpl::pre_vote(std::unique_lock* lck, bool triggered) { brpc::ChannelOptions options; options.connection_type = brpc::CONNECTION_TYPE_SINGLE; options.max_retry = 0; + options.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms; brpc::Channel channel; if (0 != channel.Init(iter->addr, &options)) { LOG(WARNING) << "node " << _group_id << ":" << _server_id @@ -1715,6 +1720,7 @@ void NodeImpl::request_peers_to_vote(const std::set& peers, } brpc::ChannelOptions options; options.connection_type = brpc::CONNECTION_TYPE_SINGLE; + options.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms; options.max_retry = 0; brpc::Channel channel; if (0 != channel.Init(iter->addr, &options)) { diff --git a/src/braft/remote_file_copier.cpp b/src/braft/remote_file_copier.cpp index 684b59bc..534e7bff 100644 --- a/src/braft/remote_file_copier.cpp +++ b/src/braft/remote_file_copier.cpp @@ -42,6 +42,8 @@ DEFINE_bool(raft_enable_throttle_when_install_snapshot, true, BRPC_VALIDATE_GFLAG(raft_enable_throttle_when_install_snapshot, ::brpc::PassValidate); +DECLARE_int32(raft_rpc_channel_connect_timeout_ms); + RemoteFileCopier::RemoteFileCopier() : _reader_id(0) , _throttle(NULL) @@ -65,7 +67,9 @@ int RemoteFileCopier::init(const std::string& uri, FileSystemAdaptor* fs, << " in " << uri; return -1; } - if (_channel.Init(ip_and_port.as_string().c_str(), NULL) != 0) { + brpc::ChannelOptions channel_opt; + channel_opt.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms; + if (_channel.Init(ip_and_port.as_string().c_str(), &channel_opt) != 0) { LOG(ERROR) << "Fail to init Channel to " << ip_and_port; return -1; } diff --git a/src/braft/replicator.cpp b/src/braft/replicator.cpp index 2e0d1e17..d64d8385 100644 --- a/src/braft/replicator.cpp +++ b/src/braft/replicator.cpp @@ -50,6 +50,8 @@ BRPC_VALIDATE_GFLAG(raft_retry_replicate_interval_ms, DECLARE_int64(raft_append_entry_high_lat_us); DECLARE_bool(raft_trace_append_entry_latency); +DECLARE_int32(raft_rpc_channel_connect_timeout_ms); + static bvar::LatencyRecorder g_send_entries_latency("raft_send_entries"); static bvar::LatencyRecorder g_normalized_send_entries_latency( "raft_send_entries_normalized"); @@ -111,7 +113,7 @@ int Replicator::start(const ReplicatorOptions& options, ReplicatorId *id) { } Replicator* r = new Replicator(); brpc::ChannelOptions channel_opt; - //channel_opt.connect_timeout_ms = *options.heartbeat_timeout_ms; + channel_opt.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms; channel_opt.timeout_ms = -1; // We don't need RPC timeout if (r->_sending_channel.Init(options.peer_id.addr, &channel_opt) != 0) { LOG(ERROR) << "Fail to init sending channel"