From 83c7b15523c5db6e8652e0ccf5122966fef9aeb7 Mon Sep 17 00:00:00 2001 From: wangjunbo Date: Thu, 4 Jan 2024 17:32:32 +0800 Subject: [PATCH] [KYUUBI #5894] Separate closed and online sessions[statements] in the SparkUI's engine tab --- .../org/apache/spark/ui/EnginePage.scala | 290 +++++++++++++----- .../apache/spark/ui/EngineSessionPage.scala | 151 ++++++--- .../org/apache/spark/ui/EngineTabSuite.scala | 21 +- 3 files changed, 333 insertions(+), 129 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala index 7188ac62f62..36c9b12fdc9 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala @@ -23,6 +23,7 @@ import java.util.Date import javax.servlet.http.HttpServletRequest import scala.collection.JavaConverters.mapAsScalaMapConverter +import scala.collection.mutable import scala.xml.{Node, Unparsed} import org.apache.commons.text.StringEscapeUtils @@ -36,18 +37,42 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { private val store = parent.store override def render(request: HttpServletRequest): Seq[Node] = { + val onlineSession = new mutable.ArrayBuffer[SessionEvent]() + val closedSession = new mutable.ArrayBuffer[SessionEvent]() + + val runningSqlStat = new mutable.ArrayBuffer[SparkOperationEvent]() + val completedSqlStat = new mutable.ArrayBuffer[SparkOperationEvent]() + val failedSqlStat = new mutable.ArrayBuffer[SparkOperationEvent]() + + store.getSessionList.foreach { s => + if (s.endTime <= 0L) { + onlineSession += s + } else { + closedSession += s + } + } + + store.getStatementList.foreach { op => + if (op.completeTime <= 0L) { + runningSqlStat += op + } else if (op.exception.isDefined) { + failedSqlStat += op + } else { + completedSqlStat += op + } + } + val content = generateBasicStats() ++
++ stop(request) ++
++

- {store.getSessionCount} session(s) are online, - running {store.getStatementCount} - operations + {onlineSession.size} session(s) are online, + running {runningSqlStat.size} operation(s)

++ - generateSessionStatsTable(request) ++ - generateStatementStatsTable(request) + generateSessionStatsTable(request, onlineSession, closedSession) ++ + generateStatementStatsTable(request, runningSqlStat, completedSqlStat, failedSqlStat) UIUtils.headerSparkPage(request, parent.name, content, parent) } @@ -129,102 +154,199 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { } } - /** Generate stats of statements for the engine */ - private def generateStatementStatsTable(request: HttpServletRequest): Seq[Node] = { - - val numStatement = store.getStatementList.size - - val table = - if (numStatement > 0) { + /** Generate stats of running statements for the engine */ + private def generateStatementStatsTable( + request: HttpServletRequest, + running: Seq[SparkOperationEvent], + completed: Seq[SparkOperationEvent], + failed: Seq[SparkOperationEvent]): Seq[Node] = { + + val content = mutable.ListBuffer[Node]() + if (running.nonEmpty) { + val sqlTableTag = "running" + val table = + statementStatsTable(request, sqlTableTag, parent, running) + content ++= + +

+ + Running Statement Statistics ( + {running.size} + ) +

+
++ +
+ {table} +
+ } - val sqlTableTag = "sqlstat" + if (completed.nonEmpty) { + val table = { + val sessionTableTag = "completed" + statementStatsTable( + request, + sessionTableTag, + parent, + completed) + } - val sqlTablePage = - Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1) + content ++= + +

+ + Completed Statement Statistics ( + {completed.size} + ) +

+
++ +
+ {table} +
+ } - try { - Some(new StatementStatsPagedTable( - request, - parent, - store.getStatementList, - "kyuubi", - UIUtils.prependBaseUri(request, parent.basePath), - sqlTableTag).table(sqlTablePage)) - } catch { - case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => - Some(
-

Error while rendering job table:

-
-              {Utils.stringifyException(e)}
-            
-
) - } - } else { - None + if (failed.nonEmpty) { + val table = { + val sessionTableTag = "failed" + statementStatsTable( + request, + sessionTableTag, + parent, + failed) } - val content = - -

- - Statement Statistics ({numStatement}) -

-
++ -
- {table.getOrElse("No statistics have been generated yet.")} + + content ++= + +

+ + Failed Statement Statistics ( + {failed.size} + ) +

+
++ +
+ {table}
+ } content } - /** Generate stats of sessions for the engine */ - private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = { - val numSessions = store.getSessionList.size - val table = - if (numSessions > 0) { + private def statementStatsTable( + request: HttpServletRequest, + sqlTableTag: String, + parent: EngineTab, + data: Seq[SparkOperationEvent]): Seq[Node] = { + + val sqlTablePage = + Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1) - val sessionTableTag = "sessionstat" + try { + new StatementStatsPagedTable( + request, + parent, + data, + "kyuubi", + UIUtils.prependBaseUri(request, parent.basePath), + sqlTableTag).table(sqlTablePage) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => +
+

Error while rendering job table:

+
+                {Utils.stringifyException(e)}
+              
+
+ } + } - val sessionTablePage = - Option(request.getParameter(s"$sessionTableTag.page")).map(_.toInt).getOrElse(1) + /** Generate stats of online sessions for the engine */ + private def generateSessionStatsTable( + request: HttpServletRequest, + online: Seq[SessionEvent], + closed: Seq[SessionEvent]): Seq[Node] = { + val content = mutable.ListBuffer[Node]() + if (online.nonEmpty) { + val sessionTableTag = "online" + val table = sessionTable( + request, + sessionTableTag, + parent, + online) + content ++= + +

+ + Online Session Statistics ( + {online.size} + ) +

+
++ +
+ {table} +
+ } - try { - Some(new SessionStatsPagedTable( - request, - parent, - store.getSessionList, - "kyuubi", - UIUtils.prependBaseUri(request, parent.basePath), - sessionTableTag).table(sessionTablePage)) - } catch { - case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => - Some(
-

Error while rendering job table:

-
-              {Utils.stringifyException(e)}
-            
-
) - } - } else { - None + if (closed.nonEmpty) { + val table = { + val sessionTableTag = "closed" + sessionTable( + request, + sessionTableTag, + parent, + closed) } - val content = - -

- - Session Statistics ({numSessions}) -

-
++ -
- {table.getOrElse("No statistics have been generated yet.")} + content ++= + +

+ + Closed Session Statistics ( + {closed.size} + ) +

+
++ +
+ {table}
- + } content } + private def sessionTable( + request: HttpServletRequest, + sessionTage: String, + parent: EngineTab, + data: Seq[SessionEvent]): Seq[Node] = { + val sessionPage = + Option(request.getParameter(s"$sessionTage.page")).map(_.toInt).getOrElse(1) + try { + new SessionStatsPagedTable( + request, + parent, + data, + "kyuubi", + UIUtils.prependBaseUri(request, parent.basePath), + sessionTage).table(sessionPage) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => +
+

Error while rendering job table:

+
+            {Utils.stringifyException(e)}
+          
+
+ } + } + private class SessionStatsPagedTable( request: HttpServletRequest, parent: EngineTab, diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala index cdfc6d31355..e6405269178 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala @@ -20,6 +20,7 @@ package org.apache.spark.ui import java.util.Date import javax.servlet.http.HttpServletRequest +import scala.collection.mutable import scala.xml.Node import org.apache.spark.internal.Logging @@ -27,6 +28,8 @@ import org.apache.spark.internal.config.SECRET_REDACTION_PATTERN import org.apache.spark.ui.UIUtils._ import org.apache.spark.util.Utils +import org.apache.kyuubi.engine.spark.events.SparkOperationEvent + /** Page for Spark Web UI that shows statistics of jobs running in the engine server */ case class EngineSessionPage(parent: EngineTab) extends WebUIPage("session") with Logging { @@ -126,50 +129,118 @@ case class EngineSessionPage(parent: EngineTab) /** Generate stats of batch statements of the engine server */ private def generateSQLStatsTable(request: HttpServletRequest, sessionID: String): Seq[Node] = { - val executionList = store.getStatementList + val running = new mutable.ArrayBuffer[SparkOperationEvent]() + val completed = new mutable.ArrayBuffer[SparkOperationEvent]() + val failed = new mutable.ArrayBuffer[SparkOperationEvent]() + + store.getStatementList .filter(_.sessionId == sessionID) - val numStatement = executionList.size - val table = - if (numStatement > 0) { - - val sqlTableTag = "sqlsessionstat" - - val sqlTablePage = - Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1) - - try { - Some(new StatementStatsPagedTable( - request, - parent, - executionList, - "kyuubi/session", - UIUtils.prependBaseUri(request, parent.basePath), - sqlTableTag).table(sqlTablePage)) - } catch { - case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => - Some(
-

Error while rendering job table:

-
-              {Utils.exceptionString(e)}
-            
-
) + .foreach { op => + if (op.completeTime <= 0L) { + running += op + } else if (op.exception.isDefined) { + failed += op + } else { + completed += op } - } else { - None } - val content = - -

- - Statement Statistics -

-
++ -
- {table.getOrElse("No statistics have been generated yet.")} -
+ val content = mutable.ListBuffer[Node]() + if (running.nonEmpty) { + val sqlTableTag = "running" + val table = statementStatsTable(request, sqlTableTag, parent, running) + content ++= + +

+ + Running Statement Statistics +

+
++ +
+ {table} +
+ } + + if (completed.nonEmpty) { + val table = { + val sessionTableTag = "completed" + statementStatsTable( + request, + sessionTableTag, + parent, + completed) + } + + content ++= + +

+ + Completed Statement Statistics ( + {completed.size} + ) +

+
++ +
+ {table} +
+ } + + if (failed.nonEmpty) { + val table = { + val sessionTableTag = "failed" + statementStatsTable( + request, + sessionTableTag, + parent, + failed) + } + + content ++= + +

+ + Failed Statement Statistics ( + {failed.size} + ) +

+
++ +
+ {table} +
+ } content } + + private def statementStatsTable( + request: HttpServletRequest, + sqlTableTag: String, + parent: EngineTab, + data: Seq[SparkOperationEvent]): Seq[Node] = { + val sqlTablePage = + Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1) + + try { + new StatementStatsPagedTable( + request, + parent, + data, + "kyuubi/session", + UIUtils.prependBaseUri(request, parent.basePath), + sqlTableTag).table(sqlTablePage) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => +
+

Error while rendering job table:

+
+            {Utils.exceptionString(e)}
+          
+
+ } + } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala index 260dbf87e17..ad056a0643d 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala @@ -96,10 +96,10 @@ class EngineTabSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { val resp = EntityUtils.toString(response.getEntity) // check session section - assert(resp.contains("Session Statistics")) + assert(resp.contains("Online Session Statistics")) // check session stats table id - assert(resp.contains("sessionstat")) + assert(resp.contains("onlineSessionstat")) // check session stats table title assert(resp.contains("Total Statements")) @@ -110,10 +110,11 @@ class EngineTabSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { assert(spark.sparkContext.ui.nonEmpty) val client = HttpClients.createDefault() val req = new HttpGet(spark.sparkContext.uiWebUrl.get + "/kyuubi/") - val response = client.execute(req) + var response = client.execute(req) assert(response.getStatusLine.getStatusCode === 200) - val resp = EntityUtils.toString(response.getEntity) + var resp = EntityUtils.toString(response.getEntity) assert(resp.contains("0 session(s) are online,")) + assert(!resp.contains("Statement Statistics")) withJdbcStatement() { statement => statement.execute( """ @@ -133,13 +134,23 @@ class EngineTabSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { // check session section assert(resp.contains("Statement Statistics")) + assert(!resp.contains("Failed Statement Statistics")) // check sql stats table id - assert(resp.contains("sqlstat")) + assert(resp.contains("runningSqlstat") || resp.contains("completedSqlstat")) + + assert(resp.contains("1 session(s) are online,")) // check sql stats table title assert(resp.contains("Query Details")) } + response = client.execute(req) + assert(response.getStatusLine.getStatusCode === 200) + resp = EntityUtils.toString(response.getEntity) + assert(resp.contains("0 session(s) are online,")) + assert(resp.contains("running 0 operation(s)")) + assert(resp.contains("completedSqlstat")) + assert(resp.contains("Completed Statement Statistics")) } test("statement redact for engine tab") {