Skip to content

Commit

Permalink
[KYUUBI #5797][FOLLOWUP] Desc engine command support show engine regi…
Browse files Browse the repository at this point in the history
…stered attributes

# 🔍 Description

desc engine command support show engine registered attributes.

## Issue References 🔗

This pull request fixes #5797

## Describe Your Solution 🔧

#5931 (comment)

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #5948 from Kwafoor/kyuubi_5797_desc_engine_follow.

Closes #5797

da89099 [wangjunbo] fix code
085ffc5 [wangjunbo] fix code
d58e8ec [wangjunbo] delete empty lines
cca773a [wangjunbo] fix code
9fcc2c6 [wangjunbo] delete ENGINE_NAMESPACE column
9dfb2f5 [wangjunbo] [KYUUBI #5797][FOLLOWUP] desc engine command support show engine registered attributes

Authored-by: wangjunbo <[email protected]>
Signed-off-by: Fei Wang <[email protected]>
  • Loading branch information
wangjunbo authored and turboFei committed Jan 18, 2024
1 parent d3a3853 commit 162e72b
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.kyuubi.util.{ThreadUtils, ThriftUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

class KyuubiSyncThriftClient private (
val hostPort: (String, Int),
protocol: TProtocol,
engineAliveProbeProtocol: Option[TProtocol],
engineAliveProbeInterval: Long,
Expand Down Expand Up @@ -483,6 +484,7 @@ private[kyuubi] object KyuubiSyncThriftClient extends Logging {
None
}
new KyuubiSyncThriftClient(
(host, port),
tProtocol,
aliveProbeProtocol,
aliveProbeInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.kyuubi.engine.jdbc.JdbcProcessBuilder
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.engine.trino.TrinoProcessBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryClientProvider, DiscoveryPaths}
import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryClientProvider, DiscoveryPaths, ServiceNodeInfo}
import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL}
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.log.OperationLog
Expand Down Expand Up @@ -337,6 +337,13 @@ private[kyuubi] class EngineRef(
}
}

def getServiceNode(
discoveryClient: DiscoveryClient,
hostPort: (String, Int)): Option[ServiceNodeInfo] = {
val serviceNodes = discoveryClient.getServiceNodesInfo(engineSpace)
serviceNodes.filter { sn => (sn.host, sn.port) == hostPort }.headOption
}

def close(): Unit = {
if (shareLevel == CONNECTION && builder != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KE
import org.apache.kyuubi.engine.{EngineRef, KyuubiApplicationManager}
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider._
import org.apache.kyuubi.ha.client.ServiceNodeInfo
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.service.authentication.InternalSecurityAccessor
Expand Down Expand Up @@ -119,6 +120,12 @@ class KyuubiSessionImpl(
engineLastAlive = System.currentTimeMillis()
}

def getEngineNode: Option[ServiceNodeInfo] = {
withDiscoveryClient(sessionConf) { discoveryClient =>
engine.getServiceNode(discoveryClient, _client.hostPort)
}
}

private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit =
handleSessionException {
withDiscoveryClient(sessionConf) { discoveryClient =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,20 @@ import org.apache.kyuubi.sql.schema.{Column, Row, Schema}
case class DescribeEngine() extends RunnableCommand {

override def run(kyuubiSession: KyuubiSession): Unit = {
val rows = Seq(kyuubiSession).map { session =>
lazy val client = session.asInstanceOf[KyuubiSessionImpl].client
val rows = Seq(kyuubiSession.asInstanceOf[KyuubiSessionImpl]).map { session =>
lazy val client = session.client
val values = new ListBuffer[String]()
values += client.engineId.getOrElse("")
values += client.engineName.getOrElse("")
values += client.engineUrl.getOrElse("")
session.getEngineNode match {
case Some(nodeInfo) =>
values += s"${nodeInfo.host}:${nodeInfo.port}"
values += nodeInfo.version.getOrElse("")
values += nodeInfo.attributes.mkString(",")
case None =>
values += ("", "", "")
}
Row(values.toList)
}
iter = new IterableFetchIterator(rows)
Expand All @@ -59,6 +67,9 @@ object DescribeEngine {
Seq(
Column("ENGINE_ID", TTypeId.STRING_TYPE, Some("Kyuubi engine identify")),
Column("ENGINE_NAME", TTypeId.STRING_TYPE, Some("Kyuubi engine name")),
Column("ENGINE_URL", TTypeId.STRING_TYPE, Some("Kyuubi engine url")))
Column("ENGINE_URL", TTypeId.STRING_TYPE, Some("Kyuubi engine url")),
Column("ENGINE_INSTANCE", TTypeId.STRING_TYPE, Some("Kyuubi engine instance host and port")),
Column("ENGINE_VERSION", TTypeId.STRING_TYPE, Some("Kyuubi engine version")),
Column("ENGINE_ATTRIBUTES", TTypeId.STRING_TYPE, Some("Kyuubi engine attributes")))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ class DescribeEngineSuite extends ExecutedCommandExecSuite {
val resultSet = statement.executeQuery(s"KYUUBI $desc ENGINE")
assert(resultSet.next())

assert(resultSet.getMetaData.getColumnCount == 3)
assert(resultSet.getMetaData.getColumnCount == 6)
assert(resultSet.getMetaData.getColumnName(1) == "ENGINE_ID")
assert(resultSet.getMetaData.getColumnName(2) == "ENGINE_NAME")
assert(resultSet.getMetaData.getColumnName(3) == "ENGINE_URL")
assert(resultSet.getMetaData.getColumnName(4) == "ENGINE_INSTANCE")
assert(resultSet.getMetaData.getColumnName(5) == "ENGINE_VERSION")
assert(resultSet.getMetaData.getColumnName(6) == "ENGINE_ATTRIBUTES")
}
}
}
Expand Down

0 comments on commit 162e72b

Please sign in to comment.