Skip to content

Commit

Permalink
IMPALA-12785: Add commands to control event-processor status
Browse files Browse the repository at this point in the history
This patch extends the existing AdminFnStmt to support operations on
EventProcessor. E.g. to pause the EventProcessor:
  impala-shell> :event_processor('pause');
to resume the EventProcessor:
  impala-shell> :event_processor('start');
Or to resume the EventProcessor on a given event id (1000):
  impala-shell> :event_processor('start', 1000);
Admin can also resume the EventProcessor at the latest event id by using
-1:
  impala-shell> :event_processor('start', -1);

Supported command actions in this patch: pause, start, status.

The command output of all actions will show the latest status of
EventProcessor, including
 - EventProcessor status:
   PAUSED / ACTIVE / ERROR / NEEDS_INVALIDATE / STOPPED / DISABLED.
 - LastSyncedEventId: The last HMS event id which we have synced to.
 - LatestEventId: The event id of the latest event in HMS.

Example output:
[localhost:21050] default> :event_processor('pause');
+--------------------------------------------------------------------------------+
| summary                                                                        |
+--------------------------------------------------------------------------------+
| EventProcessor status: PAUSED. LastSyncedEventId: 34489. LatestEventId: 34489. |
+--------------------------------------------------------------------------------+
Fetched 1 row(s) in 0.01s

If authorization is enabled, only admin users that have ALL privilege on
SERVER can run this command.

Note that there is a restriction in MetastoreEventsProcessor#start(long)
that resuming EventProcessor back to a previous event id is only allowed
when it's not in the ACTIVE state. This patch aims to expose the control
of EventProcessor to the users so MetastoreEventsProcessor is not
changed. We can investigate the restriction and see if we want to relax
it.

Note that resuming EventProcessor at a newer event id can be done on any
states. Admins can use this to manually resolve the lag of HMS event
processing, after they have made sure all (or important) tables are
manually invalidated/refreshed.

A new catalogd RPC, SetEventProcessorStatus, is added for coordinators
to control the status of EventProcessor.

Tests
 - Added e2e tests

Change-Id: I5a19f67264cfe06a1819a22c0c4f0cf174c9b958
Reviewed-on: http://gerrit.cloudera.org:8080/22250
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
  • Loading branch information
stiga-huang authored and Impala Public Jenkins committed Jan 24, 2025
1 parent c5b474d commit 2e59bba
Show file tree
Hide file tree
Showing 15 changed files with 341 additions and 15 deletions.
11 changes: 11 additions & 0 deletions be/src/catalog/catalog-server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,17 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
VLOG_RPC << "GetLatestCompactions(): response=" << ThriftDebugStringNoThrow(resp);
}

void SetEventProcessorStatus(TSetEventProcessorStatusResponse& resp,
const TSetEventProcessorStatusRequest& req) override {
VLOG_RPC << "SetEventProcessorStatus(): request=" << ThriftDebugString(req);
Status status = AcceptRequest(req.protocol_version);
if (status.ok()) {
status = catalog_server_->catalog()->SetEventProcessorStatus(req, &resp);
}
if (!status.ok()) LOG(ERROR) << status.GetDetail();
VLOG_RPC << "SetEventProcessorStatus(): response=" << ThriftDebugStringNoThrow(resp);
}

private:
CatalogServer* catalog_server_;
string server_address_;
Expand Down
8 changes: 8 additions & 0 deletions be/src/catalog/catalog-service-client-wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ class CatalogServiceClientWrapper : public CatalogServiceClient {
*send_done = true;
recv_GetLatestCompactions(_return);
}

void SetEventProcessorStatus(TSetEventProcessorStatusResponse& _return,
const TSetEventProcessorStatusRequest& req, bool* send_done) {
DCHECK(!*send_done);
send_SetEventProcessorStatus(req);
*send_done = true;
recv_SetEventProcessorStatus(_return);
}
#pragma clang diagnostic pop
};

Expand Down
6 changes: 6 additions & 0 deletions be/src/catalog/catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Catalog::Catalog() {
{"getNullPartitionName", "()[B", &get_null_partition_name_id_},
{"getLatestCompactions", "([B)[B", &get_latest_compactions_id_},
{"getAllHadoopConfigs", "()[B", &get_hadoop_configs_id_},
{"setEventProcessorStatus", "([B)[B", &set_event_processor_status_id_},
};

JNIEnv* jni_env = JniUtil::GetJNIEnv();
Expand Down Expand Up @@ -238,3 +239,8 @@ Status Catalog::GetLatestCompactions(
const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* resp) {
return JniUtil::CallJniMethod(catalog_, get_latest_compactions_id_, req, resp);
}

Status Catalog::SetEventProcessorStatus(
const TSetEventProcessorStatusRequest& req, TSetEventProcessorStatusResponse* resp) {
return JniUtil::CallJniMethod(catalog_, set_event_processor_status_id_, req, resp);
}
5 changes: 5 additions & 0 deletions be/src/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ class Catalog {
/// Returns all Hadoop configurations in key, value form in result.
Status GetAllHadoopConfigs(TGetAllHadoopConfigsResponse* result);

/// Update the status of EventProcessor.
Status SetEventProcessorStatus(
const TSetEventProcessorStatusRequest& req, TSetEventProcessorStatusResponse* resp);

private:
jobject catalog_; // instance of org.apache.impala.service.JniCatalog
jmethodID update_metastore_id_; // JniCatalog.updateMetaastore()
Expand Down Expand Up @@ -182,6 +186,7 @@ class Catalog {
jmethodID get_null_partition_name_id_; // JniCatalog.getNullPartitionName()
jmethodID get_latest_compactions_id_; // JniCatalog.getLatestCompactions()
jmethodID get_hadoop_configs_id_; // JniCatalog.getAllHadoopConfigs()
jmethodID set_event_processor_status_id_; // JniCatalog.setEventProcessorStatus()
};

}
Expand Down
16 changes: 16 additions & 0 deletions be/src/exec/catalog-op-executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,3 +475,19 @@ Status CatalogOpExecutor::GetLatestCompactions(
RETURN_IF_ERROR(rpc_status.status);
return Status::OK();
}

Status CatalogOpExecutor::SetEventProcessorStatus(
const TSetEventProcessorStatusRequest& req,
TSetEventProcessorStatusResponse* result) {
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =
CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
*ExecEnv::GetInstance()->GetCatalogdAddress(),
&CatalogServiceClientWrapper::SetEventProcessorStatus, req,
FLAGS_catalog_client_connection_num_retries,
FLAGS_catalog_client_rpc_retry_interval_ms,
[&attempt]() { return CatalogRpcDebugFn(&attempt); }, result);
RETURN_IF_ERROR(rpc_status.status);
if (result->status.status_code != TErrorCode::OK) return Status(result->status);
return Status::OK();
}
4 changes: 4 additions & 0 deletions be/src/exec/catalog-op-executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ class CatalogOpExecutor {
Status GetLatestCompactions(
const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse* result);

/// Makes an RPC to the catalog server to update the status of EventProcessor.
Status SetEventProcessorStatus(const TSetEventProcessorStatusRequest& req,
TSetEventProcessorStatusResponse* result);

/// Set in Exec(), returns a pointer to the TDdlExecResponse of the DDL execution.
/// If called before Exec(), this will return NULL. Only set if the
/// TCatalogOpType is DDL.
Expand Down
27 changes: 25 additions & 2 deletions be/src/service/client-request-state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,13 @@ Status ClientRequestState::Exec() {
break;
}
case TStmtType::ADMIN_FN:
DCHECK(exec_req.admin_request.type == TAdminRequestType::SHUTDOWN);
RETURN_IF_ERROR(ExecShutdownRequest());
if (exec_req.admin_request.type == TAdminRequestType::SHUTDOWN) {
RETURN_IF_ERROR(ExecShutdownRequest());
} else if (exec_req.admin_request.type == TAdminRequestType::EVENT_PROCESSOR) {
RETURN_IF_ERROR(ExecEventProcessorCmd());
} else {
DCHECK(false);
}
break;
case TStmtType::CONVERT:
DCHECK(exec_req.__isset.convert_table_request);
Expand Down Expand Up @@ -1094,6 +1099,24 @@ Status ClientRequestState::ExecShutdownRequest() {
return Status::OK();
}

Status ClientRequestState::ExecEventProcessorCmd() {
catalog_op_executor_.reset(
new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_));
const TEventProcessorCmdParams& params =
exec_request().admin_request.event_processor_cmd_params;
TSetEventProcessorStatusRequest request;
TSetEventProcessorStatusResponse response;
request.__set_params(params);
request.__set_header(GetCatalogServiceRequestHeader());
Status rpc_status = catalog_op_executor_->SetEventProcessorStatus(request, &response);
if (!rpc_status.ok()) {
VLOG_QUERY << "SetEventProcessorStatus failed: " << rpc_status.msg().msg();
return rpc_status;
}
SetResultSet({response.info});
return Status::OK();
}

void ClientRequestState::Finalize(const Status* cause) {
UnRegisterCompletedRPCs();
Cancel(cause, /*wait_until_finalized=*/true);
Expand Down
3 changes: 3 additions & 0 deletions be/src/service/client-request-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,9 @@ class ClientRequestState {
/// Executes a shut down request.
Status ExecShutdownRequest() WARN_UNUSED_RESULT;

/// Executes a command on EventProcessor.
Status ExecEventProcessorCmd() WARN_UNUSED_RESULT;

/// Core logic of Wait(). Does not update operation_state_/query_status_.
Status WaitInternal() WARN_UNUSED_RESULT;

Expand Down
22 changes: 22 additions & 0 deletions common/thrift/CatalogService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,24 @@ struct TUpdateTableUsageResponse {
1: optional Status.TStatus status
}

struct TEventProcessorCmdParams {
// See allowed actions in Java enum MetastoreEventsProcessor.EventProcessorCmdType.
// Use string type instead of enum to avoid incompatible thrift changes in the future.
1: required string action
2: optional i64 event_id
}

struct TSetEventProcessorStatusRequest {
1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V2
2: optional TCatalogServiceRequestHeader header
3: required TEventProcessorCmdParams params
}

struct TSetEventProcessorStatusResponse {
1: required Status.TStatus status
2: optional string info
}

// The CatalogService API
service CatalogService {
// Executes a DDL request and returns details on the result of the operation.
Expand Down Expand Up @@ -794,4 +812,8 @@ service CatalogService {

// Gets the latest compactions.
TGetLatestCompactionsResponse GetLatestCompactions(1: TGetLatestCompactionsRequest req);

// Update the status of EventProcessor.
TSetEventProcessorStatusResponse SetEventProcessorStatus(
1: TSetEventProcessorStatusRequest req);
}
2 changes: 2 additions & 0 deletions common/thrift/Frontend.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ struct TShutdownParams {
// The type of administrative function to be executed.
enum TAdminRequestType {
SHUTDOWN = 0
EVENT_PROCESSOR = 1
}

// Parameters for administrative function statement. This is essentially a tagged union
Expand All @@ -565,6 +566,7 @@ struct TAdminRequest {

// The below member corresponding to 'type' should be set.
2: optional TShutdownParams shutdown_params
3: optional CatalogService.TEventProcessorCmdParams event_processor_cmd_params
}

// HiveServer2 Metadata operations (JniFrontend.hiveServer2MetadataOperation)
Expand Down
65 changes: 52 additions & 13 deletions fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TAdminRequest;
import org.apache.impala.thrift.TAdminRequestType;
import org.apache.impala.thrift.TEventProcessorCmdParams;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TShutdownParams;

Expand All @@ -36,8 +37,8 @@
* Represents an administrative function call, e.g. ": shutdown('hostname:123')".
*
* This "admin statement" framework provides a way to expand the set of supported admin
* statements without modifying the SQL grammar. For now, the only supported function is
* shutdown(), so the logic in here is not generic.
* statements without modifying the SQL grammar. For now, the only supported functions are
* shutdown() and event_processor(), so the logic in here is not generic.
*/
public class AdminFnStmt extends StatementBase {
// Name of the function. Validated during analysis.
Expand All @@ -46,6 +47,8 @@ public class AdminFnStmt extends StatementBase {
// Arguments to the function. Always non-null.
private final List<Expr> params_;

private TAdminRequestType type_;

// Parameters for the shutdown() command.
// Address of the backend to shut down, If 'backend_' is null, that means the current
// server. If 'backend_.port' is 0, we assume the backend has the same port as this
Expand All @@ -54,6 +57,13 @@ public class AdminFnStmt extends StatementBase {
// Deadline in seconds. -1 if no deadline specified.
private long deadlineSecs_;

// Parameters for the event_processor() command
// Currently supported actions: pause, start
private String action_;
// Event id to start at. Defaults to reusing the last synced event id.
// Set to -1 for using the latest event id.
private long event_id_ = 0;

public AdminFnStmt(String fnName, List<Expr> params) {
this.fnName_ = fnName;
this.params_ = params;
Expand All @@ -72,37 +82,51 @@ public String toSql(ToSqlOptions options) {

public TAdminRequest toThrift() throws InternalException {
TAdminRequest result = new TAdminRequest();
result.type = TAdminRequestType.SHUTDOWN;
result.shutdown_params = new TShutdownParams();
if (backend_ != null) result.shutdown_params.setBackend(backend_);
if (deadlineSecs_ != -1) result.shutdown_params.setDeadline_s(deadlineSecs_);
result.type = type_;
if (type_ == TAdminRequestType.SHUTDOWN) {
result.shutdown_params = new TShutdownParams();
if (backend_ != null) result.shutdown_params.setBackend(backend_);
if (deadlineSecs_ != -1) result.shutdown_params.setDeadline_s(deadlineSecs_);
} else if (type_ == TAdminRequestType.EVENT_PROCESSOR) {
result.event_processor_cmd_params = new TEventProcessorCmdParams(action_);
if (event_id_ != 0) result.event_processor_cmd_params.setEvent_id(event_id_);
} else {
Preconditions.checkState(false, "Unsupported TAdminRequest type %s", type_);
}
return result;
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
super.analyze(analyzer);
for (Expr param : params_) param.analyze(analyzer);
// Only shutdown is supported.
if (fnName_.toLowerCase().equals("shutdown")) {
if (fnName_.equalsIgnoreCase("shutdown")) {
type_ = TAdminRequestType.SHUTDOWN;
analyzeShutdown(analyzer);
} else if (fnName_.equalsIgnoreCase("event_processor")) {
type_ = TAdminRequestType.EVENT_PROCESSOR;
analyzeEventProcessorCmd(analyzer);
} else {
throw new AnalysisException("Unknown admin function: " + fnName_);
}
}

/**
* Supports optionally specifying the backend and the deadline: either shutdown(),
* shutdown('host:port'), shutdown(deadline), shutdown('host:port', deadline).
*/
private void analyzeShutdown(Analyzer analyzer) throws AnalysisException {
private void registerPrivReq(Analyzer analyzer) {
AuthorizationConfig authzConfig = analyzer.getAuthzConfig();
if (authzConfig.isEnabled()) {
// Only admins (i.e. user with ALL privilege on server) can execute admin functions.
String authzServer = authzConfig.getServerName();
Preconditions.checkNotNull(authzServer);
analyzer.registerPrivReq(builder -> builder.onServer(authzServer).all().build());
}
}

/**
* Supports optionally specifying the backend and the deadline: either shutdown(),
* shutdown('host:port'), shutdown(deadline), shutdown('host:port', deadline).
*/
private void analyzeShutdown(Analyzer analyzer) throws AnalysisException {
registerPrivReq(analyzer);

// TODO: this parsing and type checking logic is specific to the command, similar to
// handling of other top-level commands. If we add a lot more of these functions we
Expand Down Expand Up @@ -162,4 +186,19 @@ private TNetworkAddress parseBackendAddress(String backend) throws AnalysisExcep
}
return result;
}

private void analyzeEventProcessorCmd(Analyzer analyzer) throws AnalysisException {
registerPrivReq(analyzer);

if (params_.isEmpty() || params_.size() > 2) {
throw new AnalysisException("event_processor() takes 1 or 2 arguments: " + toSql());
}
if (!(params_.get(0) instanceof StringLiteral)) {
throw new AnalysisException("First argument of event_processor() should be STRING");
}
action_ = ((StringLiteral)params_.get(0)).getStringValue().toUpperCase();
if (params_.size() > 1) {
event_id_ = params_.get(1).evalToInteger(analyzer, "event_id");
}
}
}
Loading

0 comments on commit 2e59bba

Please sign in to comment.