Skip to content

Commit

Permalink
IMPALA-12150: Use protocol version to isolate cluster components
Browse files Browse the repository at this point in the history
Some Thrift request/response structs in CatalogService were changed to
add new variables in the middle, which caused cross version
incompatibility issue for CatalogService.

Impala cluster membership is managed by the statestore. During upgrade
scenarios where different versions of Impala daemons are upgraded one
at a time, the upgraded daemons have incompatible message formats.
Even through protocol versions numbers were already defined for
Statestore and Catalog Services, they were not used. The Statestore and
Catalog server did not check the protocol version in the requests, which
allowed incompatible Impala daemons to join one cluster. This causes
unexpected query failures during rolling upgrade.

We need a way to detect this and enforce that some rules are followed:
 - Statestore refuses the registration requests from incompatible
   subscribers.
 - Catalog server refuses the requests from incompatible clients.
 - Scheduler assigns tasks to a group of compatible executors.

This patch isolate Impala daemons into separate clusters based on
protocol versions of Statestore service to prevent incompatible Impala
daemons from communicating with each other. It covers the Thrift RPC
communications between catalogd and coordinators, and communication
between statestore and its subscribers (executor, coordinators,
catalogd and admissiond). This change should work for future upgrade.

Following changes were made:
 - Bump StatestoreServiceVersion and CatalogServiceVersion to V2 for
   all requests of Statestore and Catalog services.
 - Update the request and response structs in CatalogService to ensure
   each Thrift request struct has protocol version and each Thrift
   response struct has returned status.
 - Update the request and response struct in StatestoreService to
   ensure each Thrift request struct has protocol version and each
   Thrift response struct has returned status.
 - Add subscriber type so that statestore could distinguish different
   types of subscribers.
 - Statestore checks protocol version for registration requests from
   subscribers. It refuses the requests with incompatible version.
 - Catalog server checks protocol version for Catalog service APIs, and
   returns error for requests with incompatible version.
 - Catalog daemon sends its address and the protocol version of Catalog
   service when it registers to statestore, statestore forwards the
   address and the protocol version of Catalog service to all
   subscribers during registration.
 - Add UpdateCatalogd API for StatestoreSubscriber service so that the
   coordinators could receive the address and the protocol version of
   Catalog service from statestore if the coordinators register to
   statestore before catalog daemon.
 - Add GetProtocolVersion API for Statestore service so that the
   subscribers can check the protocol version of statestore before
   calling RegisterSubscriber API.
 - Add starting flag tolerate_statestore_startup_delay. It is off by
   default. When it's enabled, the subscriber is able to tolerate
   the delay of the statestore's availability. The subscriber's
   process will not exit if it cannot register with the specified
   statestore on startup. But instead it enter into Recovery mode,
   it will loop, sleep and retry till it successfully register with
   the statestore. This flag should be enabled during rolling upgrade.

CatalogServiceVersion is defined in CatalogService.thrift. In future,
if we make non backward version compatible changes in the request or
response structures for CatalogService APIs, we need to bump the
protocol version of Catalog service.
StatestoreServiceVersion is defined in StatestoreService.thrift.
Similarly if we make non backward version compatible changes in the
request or response structures for StatestoreService APIs, we need
to bump the protocol version of Statestore service.

Message formats for KRPC communications between coordinators and
executors, and between admissiond and coordinators are defined
in proto files under common/protobuf. If we make non backward version
compatible changes in these structures, we need to bump the
protocol version of Statestore service.

Testing:
 - Added end-to-end unit tests.
 - Passed the core tests.
 - Ran manual test to verify old version of executors cannot register
   with new version of statestore, and new version of executors cannot
   register with old version of statestore.

Change-Id: If61506dab38c4d1c50419c1b3f7bc4f9ee3676bc
Reviewed-on: http://gerrit.cloudera.org:8080/19959
Reviewed-by: Andrew Sherman <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
  • Loading branch information
wzhou-code committed Jun 21, 2023
1 parent 6b571eb commit a828308
Show file tree
Hide file tree
Showing 28 changed files with 1,748 additions and 361 deletions.
72 changes: 59 additions & 13 deletions be/src/catalog/catalog-server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
// Executes a TDdlExecRequest and returns details on the result of the operation.
void ExecDdl(TDdlExecResponse& resp, const TDdlExecRequest& req) override {
VLOG_RPC << "ExecDdl(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->ExecDdl(req, &resp);
Status status = CheckProtocolVersion(req.protocol_version);
if (status.ok()) {
status = catalog_server_->catalog()->ExecDdl(req, &resp);
}
if (!status.ok()) LOG(ERROR) << status.GetDetail();
TStatus thrift_status;
status.ToThrift(&thrift_status);
Expand All @@ -189,7 +192,10 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
override {
VLOG_RPC << "ResetMetadata(): request=" << ThriftDebugString(req);
DebugActionNoFail(FLAGS_debug_actions, "RESET_METADATA_DELAY");
Status status = catalog_server_->catalog()->ResetMetadata(req, &resp);
Status status = CheckProtocolVersion(req.protocol_version);
if (status.ok()) {
status = catalog_server_->catalog()->ResetMetadata(req, &resp);
}
if (!status.ok()) LOG(ERROR) << status.GetDetail();
TStatus thrift_status;
status.ToThrift(&thrift_status);
Expand All @@ -202,7 +208,10 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
void UpdateCatalog(TUpdateCatalogResponse& resp, const TUpdateCatalogRequest& req)
override {
VLOG_RPC << "UpdateCatalog(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->UpdateCatalog(req, &resp);
Status status = CheckProtocolVersion(req.protocol_version);
if (status.ok()) {
status = catalog_server_->catalog()->UpdateCatalog(req, &resp);
}
if (!status.ok()) LOG(ERROR) << status.GetDetail();
TStatus thrift_status;
status.ToThrift(&thrift_status);
Expand All @@ -215,7 +224,10 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
void GetFunctions(TGetFunctionsResponse& resp, const TGetFunctionsRequest& req)
override {
VLOG_RPC << "GetFunctions(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->GetFunctions(req, &resp);
Status status = CheckProtocolVersion(req.protocol_version);
if (status.ok()) {
status = catalog_server_->catalog()->GetFunctions(req, &resp);
}
if (!status.ok()) LOG(ERROR) << status.GetDetail();
TStatus thrift_status;
status.ToThrift(&thrift_status);
Expand All @@ -227,9 +239,15 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
void GetCatalogObject(TGetCatalogObjectResponse& resp,
const TGetCatalogObjectRequest& req) override {
VLOG_RPC << "GetCatalogObject(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->GetCatalogObject(req.object_desc,
&resp.catalog_object);
Status status = CheckProtocolVersion(req.protocol_version);
if (status.ok()) {
status = catalog_server_->catalog()->GetCatalogObject(
req.object_desc, &resp.catalog_object);
}
if (!status.ok()) LOG(ERROR) << status.GetDetail();
TStatus thrift_status;
status.ToThrift(&thrift_status);
resp.__set_status(thrift_status);
VLOG_RPC << "GetCatalogObject(): response=" << ThriftDebugStringNoThrow(resp);
}

Expand All @@ -243,7 +261,10 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
// so a heavy query workload against a table undergoing a slow refresh doesn't
// end up taking down the catalog by creating thousands of threads.
VLOG_RPC << "GetPartialCatalogObject(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->GetPartialCatalogObject(req, &resp);
Status status = CheckProtocolVersion(req.protocol_version);
if (status.ok()) {
status = catalog_server_->catalog()->GetPartialCatalogObject(req, &resp);
}
if (!status.ok()) LOG(ERROR) << status.GetDetail();
TStatus thrift_status;
status.ToThrift(&thrift_status);
Expand All @@ -254,7 +275,10 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
void GetPartitionStats(TGetPartitionStatsResponse& resp,
const TGetPartitionStatsRequest& req) override {
VLOG_RPC << "GetPartitionStats(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->GetPartitionStats(req, &resp);
Status status = CheckProtocolVersion(req.protocol_version);
if (status.ok()) {
status = catalog_server_->catalog()->GetPartitionStats(req, &resp);
}
if (!status.ok()) LOG(ERROR) << status.GetDetail();
TStatus thrift_status;
status.ToThrift(&thrift_status);
Expand All @@ -268,7 +292,10 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
void PrioritizeLoad(TPrioritizeLoadResponse& resp, const TPrioritizeLoadRequest& req)
override {
VLOG_RPC << "PrioritizeLoad(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->PrioritizeLoad(req);
Status status = CheckProtocolVersion(req.protocol_version);
if (status.ok()) {
status = catalog_server_->catalog()->PrioritizeLoad(req);
}
if (!status.ok()) LOG(ERROR) << status.GetDetail();
TStatus thrift_status;
status.ToThrift(&thrift_status);
Expand All @@ -279,16 +306,33 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
void UpdateTableUsage(TUpdateTableUsageResponse& resp,
const TUpdateTableUsageRequest& req) override {
VLOG_RPC << "UpdateTableUsage(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->UpdateTableUsage(req);
Status status = CheckProtocolVersion(req.protocol_version);
if (status.ok()) {
status = catalog_server_->catalog()->UpdateTableUsage(req);
}
if (!status.ok()) LOG(WARNING) << status.GetDetail();
TStatus thrift_status;
status.ToThrift(&thrift_status);
resp.__set_status(thrift_status);
VLOG_RPC << "UpdateTableUsage(): response.status=" << resp.status;
}

private:
CatalogServer* catalog_server_;

Status CheckProtocolVersion(CatalogServiceVersion::type client_version) {
Status status = Status::OK();
if (client_version < catalog_server_->GetProtocolVersion()) {
status = Status(TErrorCode::CATALOG_INCOMPATIBLE_PROTOCOL, client_version + 1,
catalog_server_->GetProtocolVersion() + 1);
}
return status;
}
};

CatalogServer::CatalogServer(MetricGroup* metrics)
: thrift_iface_(new CatalogServiceThriftIf(this)),
: protocol_version_(CatalogServiceVersion::V2),
thrift_iface_(new CatalogServiceThriftIf(this)),
thrift_serializer_(FLAGS_compact_catalog_topic), metrics_(metrics),
topic_updates_ready_(false), last_sent_catalog_version_(0L),
catalog_objects_max_version_(0L) {
Expand Down Expand Up @@ -319,9 +363,10 @@ Status CatalogServer::Start() {
RETURN_IF_ERROR(Thread::Create("catalog-server", "catalog-metrics-refresh-thread",
&CatalogServer::RefreshMetrics, this, &catalog_metrics_refresh_thread_));

statestore_subscriber_.reset(new StatestoreSubscriber(
statestore_subscriber_.reset(new StatestoreSubscriberCatalog(
Substitute("catalog-server@$0", TNetworkAddressToString(server_address)),
subscriber_address, statestore_address, metrics_));
subscriber_address, statestore_address, metrics_, protocol_version_,
server_address));

StatestoreSubscriber::UpdateCallback cb =
bind<void>(mem_fn(&CatalogServer::UpdateCatalogTopicCallback), this, _1, _2);
Expand All @@ -337,6 +382,7 @@ Status CatalogServer::Start() {
status.AddDetail("CatalogService failed to start");
return status;
}

RETURN_IF_ERROR(statestore_subscriber_->Start());

// Notify the thread to start for the first time.
Expand Down
16 changes: 12 additions & 4 deletions be/src/catalog/catalog-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
#include <boost/shared_ptr.hpp>
#include <boost/unordered_set.hpp>

#include "catalog/catalog.h"
#include "gen-cpp/CatalogService.h"
#include "gen-cpp/Frontend_types.h"
#include "gen-cpp/Types_types.h"
#include "catalog/catalog.h"
#include "kudu/util/web_callback_registry.h"
#include "statestore/statestore-subscriber.h"
#include "rapidjson/rapidjson.h"
#include "statestore/statestore-subscriber-catalog.h"
#include "util/condition-variable.h"
#include "util/metrics-fwd.h"
#include "rapidjson/rapidjson.h"

using kudu::HttpStatusCode;

Expand Down Expand Up @@ -87,7 +87,15 @@ class CatalogServer {
/// service has started.
void MarkServiceAsStarted();

// Returns the protocol version of catalog service.
CatalogServiceVersion::type GetProtocolVersion() const {
return protocol_version_;
}

private:
/// Protocol version of the Catalog Service.
CatalogServiceVersion::type protocol_version_;

/// Indicates whether the catalog service is ready.
std::atomic_bool service_started_{false};

Expand All @@ -96,7 +104,7 @@ class CatalogServer {
ThriftSerializer thrift_serializer_;
MetricGroup* metrics_;
boost::scoped_ptr<Catalog> catalog_;
boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;
boost::scoped_ptr<StatestoreSubscriberCatalog> statestore_subscriber_;

/// Metric that tracks the amount of time taken preparing a catalog update.
StatsMetric<double>* topic_processing_time_metric_;
Expand Down
9 changes: 9 additions & 0 deletions be/src/common/global-flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ DEFINE_int32(stress_catalog_startup_delay_ms, 0, "A stress option that injects e
"catalogd opens ports or accepts connections. Delay <= 0 injects no delay.");
DEFINE_int32(stress_disk_read_delay_ms, 0, "A stress option that injects extra delay"
" in milliseconds when the I/O manager is reading from disk.");
DEFINE_int32(stress_statestore_startup_delay_ms, 0, "A stress option that injects extra "
"delay in milliseconds during the startup of statestore. The delay is before the "
"statestored opens ports or accepts connections. Delay <= 0 injects no delay.");
#endif

DEFINE_string(debug_actions, "", "For testing only. Uses the same format as the debug "
Expand Down Expand Up @@ -375,6 +378,12 @@ DEFINE_bool(pull_table_types_and_comments, false,
"catalogd-only flag. Required if users want GET_TABLES requests return correct table "
"types or comments.");

DEFINE_bool(tolerate_statestore_startup_delay, false, "If set to true, the subscriber "
"is able to tolerate the delay of the statestore's availability. The subscriber's "
"process will not exit if it cannot register with the specified statestore on "
"startup. But instead it enters into Recovery mode, it will loop, sleep and retry "
"till it successfully registers with the statestore.");

// TGeospatialLibrary's values are mapped here as constants
static const string geo_lib_none = "NONE";
static const string geo_lib_hive_esri = "HIVE_ESRI";
Expand Down
35 changes: 14 additions & 21 deletions be/src/exec/catalog-op-executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ using namespace apache::hive::service::cli::thrift;
using namespace apache::thrift;

DECLARE_bool(use_local_catalog);
DECLARE_int32(catalog_service_port);
DECLARE_string(catalog_service_host);
DECLARE_string(debug_actions);

DECLARE_int32(catalog_client_connection_num_retries);
Expand Down Expand Up @@ -96,8 +94,6 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) {
DCHECK(profile_ != NULL);
RuntimeProfile::Counter* exec_timer = ADD_TIMER(profile_, "CatalogOpExecTimer");
SCOPED_TIMER(exec_timer);
const TNetworkAddress& address =
MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
RETURN_IF_ERROR(status);
switch (request.op_type) {
case TCatalogOpType::DDL: {
Expand All @@ -107,7 +103,8 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) {
exec_response_.reset(new TDdlExecResponse());
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
*ExecEnv::GetInstance()->GetCatalogdAddress().get(),
&CatalogServiceClientWrapper::ExecDdl, request.ddl_params,
FLAGS_catalog_client_connection_num_retries,
FLAGS_catalog_client_rpc_retry_interval_ms,
Expand All @@ -130,7 +127,8 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) {
TResetMetadataResponse response;
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
*ExecEnv::GetInstance()->GetCatalogdAddress().get(),
&CatalogServiceClientWrapper::ResetMetadata, request.reset_metadata_params,
FLAGS_catalog_client_connection_num_retries,
FLAGS_catalog_client_rpc_retry_interval_ms,
Expand Down Expand Up @@ -344,15 +342,14 @@ void CatalogOpExecutor::SetColumnStats(const TTableSchema& col_stats_schema,

Status CatalogOpExecutor::GetCatalogObject(const TCatalogObject& object_desc,
TCatalogObject* result) {
const TNetworkAddress& address =
MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
TGetCatalogObjectRequest request;
request.__set_object_desc(object_desc);

TGetCatalogObjectResponse response;
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
*ExecEnv::GetInstance()->GetCatalogdAddress().get(),
&CatalogServiceClientWrapper::GetCatalogObject, request,
FLAGS_catalog_client_connection_num_retries,
FLAGS_catalog_client_rpc_retry_interval_ms,
Expand All @@ -366,14 +363,13 @@ Status CatalogOpExecutor::GetPartialCatalogObject(
const TGetPartialCatalogObjectRequest& req,
TGetPartialCatalogObjectResponse* resp) {
DCHECK(FLAGS_use_local_catalog || TestInfo::is_test());
const TNetworkAddress& address =
MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
if (FLAGS_inject_latency_before_catalog_fetch_ms > 0) {
SleepForMs(FLAGS_inject_latency_before_catalog_fetch_ms);
}
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
*ExecEnv::GetInstance()->GetCatalogdAddress().get(),
&CatalogServiceClientWrapper::GetPartialCatalogObject, req,
FLAGS_catalog_client_connection_num_retries,
FLAGS_catalog_client_rpc_retry_interval_ms,
Expand All @@ -388,11 +384,10 @@ Status CatalogOpExecutor::GetPartialCatalogObject(

Status CatalogOpExecutor::PrioritizeLoad(const TPrioritizeLoadRequest& req,
TPrioritizeLoadResponse* result) {
const TNetworkAddress& address =
MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
*ExecEnv::GetInstance()->GetCatalogdAddress().get(),
&CatalogServiceClientWrapper::PrioritizeLoad, req,
FLAGS_catalog_client_connection_num_retries,
FLAGS_catalog_client_rpc_retry_interval_ms,
Expand All @@ -403,11 +398,10 @@ Status CatalogOpExecutor::PrioritizeLoad(const TPrioritizeLoadRequest& req,

Status CatalogOpExecutor::GetPartitionStats(
const TGetPartitionStatsRequest& req, TGetPartitionStatsResponse* result) {
const TNetworkAddress& address =
MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
*ExecEnv::GetInstance()->GetCatalogdAddress().get(),
&CatalogServiceClientWrapper::GetPartitionStats, req,
FLAGS_catalog_client_connection_num_retries,
FLAGS_catalog_client_rpc_retry_interval_ms,
Expand All @@ -418,11 +412,10 @@ Status CatalogOpExecutor::GetPartitionStats(

Status CatalogOpExecutor::UpdateTableUsage(const TUpdateTableUsageRequest& req,
TUpdateTableUsageResponse* resp) {
const TNetworkAddress& address =
MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
*ExecEnv::GetInstance()->GetCatalogdAddress().get(),
&CatalogServiceClientWrapper::UpdateTableUsage, req,
FLAGS_catalog_client_connection_num_retries,
FLAGS_catalog_client_rpc_retry_interval_ms,
Expand Down
3 changes: 3 additions & 0 deletions be/src/rpc/thrift-server-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ class DummyStatestoreService : public StatestoreServiceIf {
virtual void RegisterSubscriber(TRegisterSubscriberResponse& response,
const TRegisterSubscriberRequest& request) {
}
virtual void GetProtocolVersion(TGetProtocolVersionResponse& response,
const TGetProtocolVersionRequest& request) {
}
};

std::shared_ptr<TProcessor> MakeProcessor() {
Expand Down
Loading

0 comments on commit a828308

Please sign in to comment.