- {table.getOrElse("No statistics have been generated yet.")}
+
+ content ++=
+
+ }
content
}
- /** Generate stats of sessions for the engine */
- private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = {
- val numSessions = store.getSessionList.size
- val table =
- if (numSessions > 0) {
+ private def statementStatsTable(
+ request: HttpServletRequest,
+ sqlTableTag: String,
+ parent: EngineTab,
+ data: Seq[SparkOperationEvent]): Seq[Node] = {
+
+ val sqlTablePage =
+ Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1)
- val sessionTableTag = "sessionstat"
+ try {
+ new StatementStatsPagedTable(
+ request,
+ parent,
+ data,
+ "kyuubi",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ sqlTableTag).table(sqlTablePage)
+ } catch {
+ case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+
+ }
+ }
- val sessionTablePage =
- Option(request.getParameter(s"$sessionTableTag.page")).map(_.toInt).getOrElse(1)
+ /** Generate stats of online sessions for the engine */
+ private def generateSessionStatsTable(
+ request: HttpServletRequest,
+ online: Seq[SessionEvent],
+ closed: Seq[SessionEvent]): Seq[Node] = {
+ val content = mutable.ListBuffer[Node]()
+ if (online.nonEmpty) {
+ val sessionTableTag = "online"
+ val table = sessionTable(
+ request,
+ sessionTableTag,
+ parent,
+ online)
+ content ++=
+
+ }
- try {
- Some(new SessionStatsPagedTable(
- request,
- parent,
- store.getSessionList,
- "kyuubi",
- UIUtils.prependBaseUri(request, parent.basePath),
- sessionTableTag).table(sessionTablePage))
- } catch {
- case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
- Some(
)
- }
- } else {
- None
+ if (closed.nonEmpty) {
+ val table = {
+ val sessionTableTag = "closed"
+ sessionTable(
+ request,
+ sessionTableTag,
+ parent,
+ closed)
}
- val content =
-
- {table.getOrElse("No statistics have been generated yet.")}
+ content ++=
+
+
+ ++
+
+ {table}
-
+ }
content
}
+ private def sessionTable(
+ request: HttpServletRequest,
+ sessionTage: String,
+ parent: EngineTab,
+ data: Seq[SessionEvent]): Seq[Node] = {
+ val sessionPage =
+ Option(request.getParameter(s"$sessionTage.page")).map(_.toInt).getOrElse(1)
+ try {
+ new SessionStatsPagedTable(
+ request,
+ parent,
+ data,
+ "kyuubi",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ sessionTage).table(sessionPage)
+ } catch {
+ case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+
+
Error while rendering job table:
+
+ {Utils.stringifyException(e)}
+
+
+ }
+ }
+
private class SessionStatsPagedTable(
request: HttpServletRequest,
parent: EngineTab,
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
index cdfc6d31355..e6405269178 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
@@ -20,6 +20,7 @@ package org.apache.spark.ui
import java.util.Date
import javax.servlet.http.HttpServletRequest
+import scala.collection.mutable
import scala.xml.Node
import org.apache.spark.internal.Logging
@@ -27,6 +28,8 @@ import org.apache.spark.internal.config.SECRET_REDACTION_PATTERN
import org.apache.spark.ui.UIUtils._
import org.apache.spark.util.Utils
+import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
+
/** Page for Spark Web UI that shows statistics of jobs running in the engine server */
case class EngineSessionPage(parent: EngineTab)
extends WebUIPage("session") with Logging {
@@ -126,50 +129,118 @@ case class EngineSessionPage(parent: EngineTab)
/** Generate stats of batch statements of the engine server */
private def generateSQLStatsTable(request: HttpServletRequest, sessionID: String): Seq[Node] = {
- val executionList = store.getStatementList
+ val running = new mutable.ArrayBuffer[SparkOperationEvent]()
+ val completed = new mutable.ArrayBuffer[SparkOperationEvent]()
+ val failed = new mutable.ArrayBuffer[SparkOperationEvent]()
+
+ store.getStatementList
.filter(_.sessionId == sessionID)
- val numStatement = executionList.size
- val table =
- if (numStatement > 0) {
-
- val sqlTableTag = "sqlsessionstat"
-
- val sqlTablePage =
- Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1)
-
- try {
- Some(new StatementStatsPagedTable(
- request,
- parent,
- executionList,
- "kyuubi/session",
- UIUtils.prependBaseUri(request, parent.basePath),
- sqlTableTag).table(sqlTablePage))
- } catch {
- case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
- Some(
-
Error while rendering job table:
-
- {Utils.exceptionString(e)}
-
-
)
+ .foreach { op =>
+ if (op.completeTime <= 0L) {
+ running += op
+ } else if (op.exception.isDefined) {
+ failed += op
+ } else {
+ completed += op
}
- } else {
- None
}
- val content =
-
-
- ++
-
- {table.getOrElse("No statistics have been generated yet.")}
-
+ val content = mutable.ListBuffer[Node]()
+ if (running.nonEmpty) {
+ val sqlTableTag = "running"
+ val table = statementStatsTable(request, sqlTableTag, parent, running)
+ content ++=
+
+
+ ++
+
+ {table}
+
+ }
+
+ if (completed.nonEmpty) {
+ val table = {
+ val sessionTableTag = "completed"
+ statementStatsTable(
+ request,
+ sessionTableTag,
+ parent,
+ completed)
+ }
+
+ content ++=
+
+
+ ++
+
+ {table}
+
+ }
+
+ if (failed.nonEmpty) {
+ val table = {
+ val sessionTableTag = "failed"
+ statementStatsTable(
+ request,
+ sessionTableTag,
+ parent,
+ failed)
+ }
+
+ content ++=
+
+
+ ++
+
+ {table}
+
+ }
content
}
+
+ private def statementStatsTable(
+ request: HttpServletRequest,
+ sqlTableTag: String,
+ parent: EngineTab,
+ data: Seq[SparkOperationEvent]): Seq[Node] = {
+ val sqlTablePage =
+ Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1)
+
+ try {
+ new StatementStatsPagedTable(
+ request,
+ parent,
+ data,
+ "kyuubi/session",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ sqlTableTag).table(sqlTablePage)
+ } catch {
+ case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+
+
Error while rendering job table:
+
+ {Utils.exceptionString(e)}
+
+
+ }
+ }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
index 260dbf87e17..ad056a0643d 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
@@ -96,10 +96,10 @@ class EngineTabSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
val resp = EntityUtils.toString(response.getEntity)
// check session section
- assert(resp.contains("Session Statistics"))
+ assert(resp.contains("Online Session Statistics"))
// check session stats table id
- assert(resp.contains("sessionstat"))
+ assert(resp.contains("onlineSessionstat"))
// check session stats table title
assert(resp.contains("Total Statements"))
@@ -110,10 +110,11 @@ class EngineTabSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
assert(spark.sparkContext.ui.nonEmpty)
val client = HttpClients.createDefault()
val req = new HttpGet(spark.sparkContext.uiWebUrl.get + "/kyuubi/")
- val response = client.execute(req)
+ var response = client.execute(req)
assert(response.getStatusLine.getStatusCode === 200)
- val resp = EntityUtils.toString(response.getEntity)
+ var resp = EntityUtils.toString(response.getEntity)
assert(resp.contains("0 session(s) are online,"))
+ assert(!resp.contains("Statement Statistics"))
withJdbcStatement() { statement =>
statement.execute(
"""
@@ -133,13 +134,23 @@ class EngineTabSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
// check session section
assert(resp.contains("Statement Statistics"))
+ assert(!resp.contains("Failed Statement Statistics"))
// check sql stats table id
- assert(resp.contains("sqlstat"))
+ assert(resp.contains("runningSqlstat") || resp.contains("completedSqlstat"))
+
+ assert(resp.contains("1 session(s) are online,"))
// check sql stats table title
assert(resp.contains("Query Details"))
}
+ response = client.execute(req)
+ assert(response.getStatusLine.getStatusCode === 200)
+ resp = EntityUtils.toString(response.getEntity)
+ assert(resp.contains("0 session(s) are online,"))
+ assert(resp.contains("running 0 operation(s)"))
+ assert(resp.contains("completedSqlstat"))
+ assert(resp.contains("Completed Statement Statistics"))
}
test("statement redact for engine tab") {