-
Notifications
You must be signed in to change notification settings - Fork 924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RESTful API supports killing engine forcibly #6008
Changes from 23 commits
5faa5b5
f3ab9c5
d6f82ff
8a65cf1
f11e765
51827ec
070aad0
fb9b251
bd7bb45
632c56b
3ad9577
5062206
513bcdc
ab31382
634ceb6
ba57c2c
11106d7
9bacc2c
936a54e
ae24ea7
3a2f597
a13466e
f826d05
0cdeede
b013194
6d5d087
72d7df3
5e1b6a1
cd5129d
efc7587
8721a2d
00c208a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,13 @@ | |
|
||
package org.apache.kyuubi.engine | ||
|
||
import java.nio.charset.StandardCharsets | ||
import java.util.Base64 | ||
|
||
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} | ||
import com.fasterxml.jackson.module.scala.DefaultScalaModule | ||
|
||
import org.apache.kyuubi.Logging | ||
import org.apache.kyuubi.config.KyuubiConf | ||
import org.apache.kyuubi.engine.ApplicationState.ApplicationState | ||
|
||
|
@@ -132,8 +139,11 @@ case class ApplicationManagerInfo( | |
resourceManager: Option[String], | ||
kubernetesInfo: KubernetesInfo = KubernetesInfo()) | ||
|
||
object ApplicationManagerInfo { | ||
object ApplicationManagerInfo extends Logging { | ||
final val DEFAULT_KUBERNETES_NAMESPACE = "default" | ||
val mapper: ObjectMapper = new ObjectMapper() | ||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) | ||
.registerModule(DefaultScalaModule) | ||
|
||
def apply( | ||
resourceManager: Option[String], | ||
|
@@ -143,4 +153,21 @@ object ApplicationManagerInfo { | |
resourceManager, | ||
KubernetesInfo(kubernetesContext, kubernetesNamespace)) | ||
} | ||
|
||
def serialize(appMgrInfo: ApplicationManagerInfo): String = { | ||
Base64.getEncoder.encodeToString( | ||
mapper.writeValueAsString(appMgrInfo).getBytes(StandardCharsets.UTF_8)) | ||
} | ||
|
||
def deserialize(encodedStr: String): ApplicationManagerInfo = { | ||
try { | ||
info(s"The original string encoded:$encodedStr") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move to catch block. we only need to print the base64 string when something goes wrong There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. Done |
||
val json = new String( | ||
Base64.getDecoder.decode(encodedStr.getBytes), | ||
StandardCharsets.UTF_8) | ||
mapper.readValue(json, classOf[ApplicationManagerInfo]) | ||
} catch { | ||
case _: Throwable => ApplicationManagerInfo(None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. log the original encodedStr There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. Thanks. |
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,15 +31,16 @@ import org.apache.commons.lang3.StringUtils | |
|
||
import org.apache.kyuubi.{KYUUBI_VERSION, Logging} | ||
import org.apache.kyuubi.client.api.v1.dto._ | ||
import org.apache.kyuubi.config.KyuubiConf | ||
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} | ||
import org.apache.kyuubi.config.KyuubiConf._ | ||
import org.apache.kyuubi.engine.ApplicationManagerInfo | ||
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE | ||
import org.apache.kyuubi.ha.client.{DiscoveryPaths, ServiceNodeInfo} | ||
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient | ||
import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle} | ||
import org.apache.kyuubi.server.KyuubiServer | ||
import org.apache.kyuubi.server.api.{ApiRequestContext, ApiUtils} | ||
import org.apache.kyuubi.session.{KyuubiSession, SessionHandle} | ||
import org.apache.kyuubi.session.{KyuubiSession, KyuubiSessionManager, SessionHandle} | ||
|
||
@Tag(name = "Admin") | ||
@Produces(Array(MediaType.APPLICATION_JSON)) | ||
|
@@ -277,7 +278,8 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { | |
@QueryParam("sharelevel") shareLevel: String, | ||
@QueryParam("subdomain") subdomain: String, | ||
@QueryParam("proxyUser") kyuubiProxyUser: String, | ||
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Response = { | ||
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String, | ||
@QueryParam("kill") @DefaultValue("false") kill: Boolean): Response = { | ||
val activeProxyUser = Option(kyuubiProxyUser).getOrElse(hs2ProxyUser) | ||
val userName = if (fe.isAdministrator(fe.getRealUser())) { | ||
Option(activeProxyUser).getOrElse(fe.getRealUser()) | ||
|
@@ -286,24 +288,38 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { | |
} | ||
val engine = normalizeEngineInfo(userName, engineType, shareLevel, subdomain, "default") | ||
val engineSpace = calculateEngineSpace(engine) | ||
val responseMsgBuilder = new StringBuilder() | ||
|
||
withDiscoveryClient(fe.getConf) { discoveryClient => | ||
wForget marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val engineNodes = discoveryClient.getChildren(engineSpace) | ||
engineNodes.foreach { node => | ||
val nodePath = s"$engineSpace/$node" | ||
val engineNodes = discoveryClient.getServiceNodesInfo(engineSpace, silent = true) | ||
engineNodes.foreach { engineNode => | ||
val nodePath = s"$engineSpace/${engineNode.nodeName}" | ||
val engineRefId = engineNode.engineRefId.orNull | ||
info(s"Deleting engine node:$nodePath") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for the late comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. except for CONNECTION level engine, we cannot guarantee that there are no other engines access the same There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean there can be multiple engines under same subdomain? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. Even in GROUP level, we can only create one engine per subdomain, right. And subdomain has been included in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there are some corner cases. for example, if the engine has no response due to full GC or overload, kyuubi will create a new one on the same engine space. I also see sometimes the dist lock does not work properly(which should not, may be bugs), that also causes multi engines live in the same subdomain. finally, in theory, there are race conditions for kyuubi server deleting and engine registering under the same subdomain. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the explanation! |
||
try { | ||
discoveryClient.delete(nodePath) | ||
responseMsgBuilder | ||
.append(s"Engine $engineSpace refId=$engineRefId is deleted successfully.") | ||
} catch { | ||
case e: Exception => | ||
error(s"Failed to delete engine node:$nodePath", e) | ||
throw new NotFoundException(s"Failed to delete engine node:$nodePath," + | ||
s"${e.getMessage}") | ||
} | ||
|
||
if (kill && engineRefId != null) { | ||
val appMgrInfo = | ||
engineNode.attributes.get(KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO) | ||
.map(ApplicationManagerInfo.deserialize).getOrElse(ApplicationManagerInfo(None)) | ||
val killResponse = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager] | ||
.applicationManager.killApplication(appMgrInfo, engineRefId) | ||
responseMsgBuilder | ||
.append(s"\nKilled engine with $appMgrInfo/$engineRefId: $killResponse") | ||
} | ||
} | ||
} | ||
|
||
Response.ok(s"Engine $engineSpace is deleted successfully.").build() | ||
Response.ok(responseMsgBuilder.toString()).build() | ||
} | ||
|
||
@ApiResponse( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
restore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Format issue. Restored.