Skip to content

Commit

Permalink
do not use proxy user
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Nov 17, 2023
1 parent 88fae49 commit 7340401
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public OperationLog getBatchLocalLog(String batchId, int from, int size) {
return this.getClient().get(path, params, OperationLog.class, client.getAuthHeader());
}

/**
* hs2ProxyUser for delete batch is deprecated since 1.8.1,
* please use {@link #deleteBatch(String)} instead.
*/
@Deprecated
public CloseBatchResponse deleteBatch(String batchId, String hs2ProxyUser) {
Map<String, Object> params = new HashMap<>();
params.put("hive.server2.proxy.user", hs2ProxyUser);
Expand All @@ -109,6 +114,11 @@ public CloseBatchResponse deleteBatch(String batchId, String hs2ProxyUser) {
return this.getClient().delete(path, params, CloseBatchResponse.class, client.getAuthHeader());
}

public CloseBatchResponse deleteBatch(String batchId) {
String path = String.format("%s/%s", API_BASE_PATH, batchId);
return this.getClient().delete(path, null, CloseBatchResponse.class, client.getAuthHeader());
}

private IRestClient getClient() {
return this.client.getHttpClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,19 +448,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
description = "close and cancel a batch session")
@DELETE
@Path("{batchId}")
def closeBatchSession(
@PathParam("batchId") batchId: String,
@QueryParam("proxyUser") kyuubiProxyUser: String,
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): CloseBatchResponse = {

def checkPermission(operator: String, owner: String): Unit = {
if (operator != owner) {
throw new WebApplicationException(
s"$operator is not allowed to close the session belong to $owner",
Status.METHOD_NOT_ALLOWED)
}
}

def closeBatchSession(@PathParam("batchId") batchId: String): CloseBatchResponse = {
def forceKill(
appMgrInfo: ApplicationManagerInfo,
batchId: String,
Expand All @@ -472,18 +460,15 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
(killed, message)
}

val activeProxyUser = Option(kyuubiProxyUser).getOrElse(hs2ProxyUser)
val userName = fe.getSessionUser(activeProxyUser)

val sessionHandle = formatSessionHandle(batchId)
sessionManager.getBatchSession(sessionHandle).map { batchSession =>
checkPermission(userName, batchSession.user)
fe.getSessionUser(batchSession.user)
sessionManager.closeSession(batchSession.handle)
val (killed, msg) = batchSession.batchJobSubmissionOp.getKillMessage
new CloseBatchResponse(killed, msg)
}.getOrElse {
sessionManager.getBatchMetadata(batchId).map { metadata =>
checkPermission(userName, metadata.username)
fe.getSessionUser(metadata.username)
if (OperationState.isTerminal(OperationState.withName(metadata.state))) {
new CloseBatchResponse(false, s"The batch[$metadata] has been terminated.")
} else if (batchV2Enabled(metadata.requestConf) && metadata.state == "INITIALIZED" &&
Expand All @@ -494,21 +479,21 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
} else if (batchV2Enabled(metadata.requestConf) && metadata.kyuubiInstance == null) {
// code goes here indicates metadata is outdated, recursively calls itself to refresh
// the metadata
closeBatchSession(batchId, kyuubiProxyUser, hs2ProxyUser)
closeBatchSession(batchId)
} else if (metadata.kyuubiInstance != fe.connectionUrl) {
info(s"Redirecting delete batch[$batchId] to ${metadata.kyuubiInstance}")
val internalRestClient = getInternalRestClient(metadata.kyuubiInstance)
try {
internalRestClient.deleteBatch(userName, batchId)
internalRestClient.deleteBatch(metadata.username, batchId)
} catch {
case e: KyuubiRestException =>
error(s"Error redirecting delete batch[$batchId] to ${metadata.kyuubiInstance}", e)
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, userName)
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, metadata.username)
new CloseBatchResponse(killed, if (killed) msg else Utils.stringifyException(e))
}
} else { // should not happen, but handle this for safe
warn(s"Something wrong on deleting batch[$batchId], try forcibly killing application")
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, userName)
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, metadata.username)
new CloseBatchResponse(killed, msg)
}
}.getOrElse {
Expand Down

0 comments on commit 7340401

Please sign in to comment.