diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala index 0dc6692da43..d24387341e6 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala @@ -42,6 +42,7 @@ import org.apache.kyuubi.util.{ThreadUtils, ThriftUtils} import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay class KyuubiSyncThriftClient private ( + val hostPort: (String, Int), protocol: TProtocol, engineAliveProbeProtocol: Option[TProtocol], engineAliveProbeInterval: Long, @@ -483,6 +484,7 @@ private[kyuubi] object KyuubiSyncThriftClient extends Logging { None } new KyuubiSyncThriftClient( + (host, port), tProtocol, aliveProbeProtocol, aliveProbeInterval, diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index ae32b04f0f1..eb9c7ab47c9 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -38,7 +38,7 @@ import org.apache.kyuubi.engine.jdbc.JdbcProcessBuilder import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.engine.trino.TrinoProcessBuilder import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE} -import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryClientProvider, DiscoveryPaths} +import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryClientProvider, DiscoveryPaths, ServiceNodeInfo} import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL} import org.apache.kyuubi.metrics.MetricsSystem import org.apache.kyuubi.operation.log.OperationLog @@ -337,6 +337,13 @@ private[kyuubi] class EngineRef( } } + def getServiceNode( + discoveryClient: DiscoveryClient, + hostPort: (String, Int)): Option[ServiceNodeInfo] = { + val serviceNodes = discoveryClient.getServiceNodesInfo(engineSpace) + serviceNodes.filter { sn => (sn.host, sn.port) == hostPort }.headOption + } + def close(): Unit = { if (shareLevel == CONNECTION && builder != null) { try { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index a5d160e0714..e34f7b2a06d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -30,6 +30,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KE import org.apache.kyuubi.engine.{EngineRef, KyuubiApplicationManager} import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent} import org.apache.kyuubi.ha.client.DiscoveryClientProvider._ +import org.apache.kyuubi.ha.client.ServiceNodeInfo import org.apache.kyuubi.operation.{Operation, OperationHandle} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.service.authentication.InternalSecurityAccessor @@ -119,6 +120,12 @@ class KyuubiSessionImpl( engineLastAlive = System.currentTimeMillis() } + def getEngineNode: Option[ServiceNodeInfo] = { + withDiscoveryClient(sessionConf) { discoveryClient => + engine.getServiceNode(discoveryClient, _client.hostPort) + } + } + private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit = handleSessionException { withDiscoveryClient(sessionConf) { discoveryClient => diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/plan/command/DescribeEngine.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/plan/command/DescribeEngine.scala index 85ec536853a..0c9a0bfa567 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/plan/command/DescribeEngine.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/plan/command/DescribeEngine.scala @@ -35,12 +35,20 @@ import org.apache.kyuubi.sql.schema.{Column, Row, Schema} case class DescribeEngine() extends RunnableCommand { override def run(kyuubiSession: KyuubiSession): Unit = { - val rows = Seq(kyuubiSession).map { session => - lazy val client = session.asInstanceOf[KyuubiSessionImpl].client + val rows = Seq(kyuubiSession.asInstanceOf[KyuubiSessionImpl]).map { session => + lazy val client = session.client val values = new ListBuffer[String]() values += client.engineId.getOrElse("") values += client.engineName.getOrElse("") values += client.engineUrl.getOrElse("") + session.getEngineNode match { + case Some(nodeInfo) => + values += s"${nodeInfo.host}:${nodeInfo.port}" + values += nodeInfo.version.getOrElse("") + values += nodeInfo.attributes.mkString(",") + case None => + values += ("", "", "") + } Row(values.toList) } iter = new IterableFetchIterator(rows) @@ -59,6 +67,9 @@ object DescribeEngine { Seq( Column("ENGINE_ID", TTypeId.STRING_TYPE, Some("Kyuubi engine identify")), Column("ENGINE_NAME", TTypeId.STRING_TYPE, Some("Kyuubi engine name")), - Column("ENGINE_URL", TTypeId.STRING_TYPE, Some("Kyuubi engine url"))) + Column("ENGINE_URL", TTypeId.STRING_TYPE, Some("Kyuubi engine url")), + Column("ENGINE_INSTANCE", TTypeId.STRING_TYPE, Some("Kyuubi engine instance host and port")), + Column("ENGINE_VERSION", TTypeId.STRING_TYPE, Some("Kyuubi engine version")), + Column("ENGINE_ATTRIBUTES", TTypeId.STRING_TYPE, Some("Kyuubi engine attributes"))) } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/parser/DescribeEngineSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/parser/DescribeEngineSuite.scala index d9488abd62f..1b11fb827ef 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/parser/DescribeEngineSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/parser/DescribeEngineSuite.scala @@ -25,10 +25,13 @@ class DescribeEngineSuite extends ExecutedCommandExecSuite { val resultSet = statement.executeQuery(s"KYUUBI $desc ENGINE") assert(resultSet.next()) - assert(resultSet.getMetaData.getColumnCount == 3) + assert(resultSet.getMetaData.getColumnCount == 6) assert(resultSet.getMetaData.getColumnName(1) == "ENGINE_ID") assert(resultSet.getMetaData.getColumnName(2) == "ENGINE_NAME") assert(resultSet.getMetaData.getColumnName(3) == "ENGINE_URL") + assert(resultSet.getMetaData.getColumnName(4) == "ENGINE_INSTANCE") + assert(resultSet.getMetaData.getColumnName(5) == "ENGINE_VERSION") + assert(resultSet.getMetaData.getColumnName(6) == "ENGINE_ATTRIBUTES") } } }