Skip to content

Commit

Permalink
[enhancement](brpc) remove client from brpc cache if the underlying c…
Browse files Browse the repository at this point in the history
…hannel has error (#47487)

### What problem does this PR solve?

If the channel in the stub has error, it should not be reused any more,
it should be removed from the cache.
  • Loading branch information
yiguolei authored Feb 5, 2025
1 parent 3509647 commit e042e9f
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 2 deletions.
87 changes: 85 additions & 2 deletions be/src/util/brpc_client_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/status.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"
#include "util/dns_cache.h"
#include "util/network_util.h"

Expand All @@ -56,6 +58,80 @@ using StubMap = phmap::parallel_flat_hash_map<

namespace doris {

class FailureDetectClosure : public ::google::protobuf::Closure {
public:
FailureDetectClosure(std::shared_ptr<AtomicStatus>& channel_st,
::google::protobuf::RpcController* controller,
::google::protobuf::Closure* done)
: _channel_st(channel_st), _controller(controller), _done(done) {}

void Run() override {
Defer defer {[&]() { delete this; }};
// All brpc related API will use brpc::Controller, so that it is safe
// to do static cast here.
auto* cntl = static_cast<brpc::Controller*>(_controller);
if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) {
Status error_st = Status::NetworkError(
"Failed to send brpc, error={}, error_text={}, client: {}, latency = {}",
berror(cntl->ErrorCode()), cntl->ErrorText(), BackendOptions::get_localhost(),
cntl->latency_us());
LOG(WARNING) << error_st;
_channel_st->update(error_st);
}
// Sometimes done == nullptr, for example hand_shake API.
if (_done != nullptr) {
_done->Run();
}
// _done->Run may throw exception, so that move delete this to Defer.
// delete this;
}

private:
std::shared_ptr<AtomicStatus> _channel_st;
::google::protobuf::RpcController* _controller;
::google::protobuf::Closure* _done;
};

// This channel will use FailureDetectClosure to wrap the original closure
// If some non-recoverable rpc failure happens, it will save the error status in
// _channel_st.
// And brpc client cache will depend on it to detect if the client is health.
class FailureDetectChannel : public ::brpc::Channel {
public:
FailureDetectChannel() : ::brpc::Channel() {
_channel_st = std::make_shared<AtomicStatus>(); // default OK
}
void CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request, google::protobuf::Message* response,
google::protobuf::Closure* done) override {
FailureDetectClosure* failure_detect_closure = nullptr;
if (done != nullptr) {
// If done == nullptr, then it means the call is sync call, so that should not
// gen a failure detect closure for it. Or it will core.
failure_detect_closure = new FailureDetectClosure(_channel_st, controller, done);
}
::brpc::Channel::CallMethod(method, controller, request, response, failure_detect_closure);
// Done == nullptr, it is a sync call, should also deal with the bad channel.
if (done == nullptr) {
auto* cntl = static_cast<brpc::Controller*>(controller);
if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) {
Status error_st = Status::NetworkError(
"Failed to send brpc, error={}, error_text={}, client: {}, latency = {}",
berror(cntl->ErrorCode()), cntl->ErrorText(),
BackendOptions::get_localhost(), cntl->latency_us());
LOG(WARNING) << error_st;
_channel_st->update(error_st);
}
}
}

std::shared_ptr<AtomicStatus> channel_status() { return _channel_st; }

private:
std::shared_ptr<AtomicStatus> _channel_st;
};

template <class T>
class BrpcClientCache {
public:
Expand Down Expand Up @@ -99,7 +175,14 @@ class BrpcClientCache {
auto get_value = [&stub_ptr](const auto& v) { stub_ptr = v.second; };
if (LIKELY(_stub_map.if_contains(host_port, get_value))) {
DCHECK(stub_ptr != nullptr);
return stub_ptr;
// All client created from this cache will use FailureDetectChannel, so it is
// safe to do static cast here.
// Check if the base channel is OK, if not ignore the stub and create new one.
if (static_cast<FailureDetectChannel*>(stub_ptr->channel())->channel_status()->ok()) {
return stub_ptr;
} else {
_stub_map.erase(host_port);
}
}

// new one stub and insert into map
Expand Down Expand Up @@ -148,7 +231,7 @@ class BrpcClientCache {
options.timeout_ms = 2000;
options.max_retry = 10;

std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
std::unique_ptr<FailureDetectChannel> channel(new FailureDetectChannel());
int ret_code = 0;
if (host_port.find("://") == std::string::npos) {
ret_code = channel->Init(host_port.c_str(), &options);
Expand Down
47 changes: 47 additions & 0 deletions be/test/util/brpc_client_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,51 @@ TEST_F(BrpcClientCacheTest, invalid) {
EXPECT_EQ(nullptr, stub1);
}

TEST_F(BrpcClientCacheTest, failure) {
BrpcClientCache<PBackendService_Stub> cache;
TNetworkAddress address;
address.hostname = "127.0.0.1";
address.port = 123;
std::shared_ptr<PBackendService_Stub> stub1 = cache.get_client(address);
EXPECT_NE(nullptr, stub1);
std::shared_ptr<PBackendService_Stub> stub2 = cache.get_client(address);
EXPECT_NE(nullptr, stub2);
// The channel is ok, so that the stub is the same
EXPECT_EQ(stub1, stub2);
EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub1->channel())->channel_status()->ok());

// update channel st to error, will get a new stub
static_cast<FailureDetectChannel*>(stub1->channel())
->channel_status()
->update(Status::NetworkError("test brpc error"));
std::shared_ptr<PBackendService_Stub> stub3 = cache.get_client(address);
EXPECT_NE(nullptr, stub3);
EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub3->channel())->channel_status()->ok());
// Then will get a new brpc stub not the previous one.
EXPECT_NE(stub2, stub3);
// The previous channel is not ok.
EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub2->channel())->channel_status()->ok());

// Call handshake method, it will trigger host is down error. It is a sync call, not use closure.
cache.available(stub3, address.hostname, address.port);
EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub3->channel())->channel_status()->ok());

std::shared_ptr<PBackendService_Stub> stub4 = cache.get_client(address);
EXPECT_NE(nullptr, stub4);
EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub4->channel())->channel_status()->ok());

// Call handshake method, it will trigger host is down error. It is a async all, will use closure.
std::string message = "hello doris!";
PHandShakeRequest request;
request.set_hello(message);
PHandShakeResponse response;
brpc::Controller cntl4;
stub4->hand_shake(&cntl4, &request, &response, brpc::DoNothing());
brpc::Join(cntl4.call_id());
EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub4->channel())->channel_status()->ok());

// Check map size is 1
EXPECT_EQ(1, cache.size());
}

} // namespace doris

0 comments on commit e042e9f

Please sign in to comment.