diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala index 7188ac62f62..cae0c03bf4d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala @@ -23,6 +23,7 @@ import java.util.Date import javax.servlet.http.HttpServletRequest import scala.collection.JavaConverters.mapAsScalaMapConverter +import scala.collection.mutable import scala.xml.{Node, Unparsed} import org.apache.commons.text.StringEscapeUtils @@ -36,18 +37,46 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { private val store = parent.store override def render(request: HttpServletRequest): Seq[Node] = { + val onlineSession = new mutable.ArrayBuffer[SessionEvent]() + val closedSession = new mutable.ArrayBuffer[SessionEvent]() + + val runningSqlStat = new mutable.ArrayBuffer[SparkOperationEvent]() + val completedSqlStat = new mutable.ArrayBuffer[SparkOperationEvent]() + val failedSqlStat = new mutable.ArrayBuffer[SparkOperationEvent]() + + store.getSessionList.foreach { s => + if (s.endTime <= 0L) { + onlineSession += s + } else { + closedSession += s + } + } + + store.getStatementList.foreach { op => + if (op.completeTime <= 0L) { + runningSqlStat += op + } else if (op.exception.isDefined) { + failedSqlStat += op + } else { + completedSqlStat += op + } + } + val content = generateBasicStats() ++ <br/> ++ stop(request) ++ <br/> ++ <h4> - {store.getSessionCount} session(s) are online, - running {store.getStatementCount} - operations + {onlineSession.size} session(s) are online, + running {runningSqlStat.size} operation(s) </h4> ++ - generateSessionStatsTable(request) ++ - generateStatementStatsTable(request) + generateSessionStatsTable(request, onlineSession.toSeq, closedSession.toSeq) ++ + generateStatementStatsTable( + request, + runningSqlStat.toSeq, + completedSqlStat.toSeq, + failedSqlStat.toSeq) UIUtils.headerSparkPage(request, parent.name, content, parent) } @@ -129,102 +158,199 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { } } - /** Generate stats of statements for the engine */ - private def generateStatementStatsTable(request: HttpServletRequest): Seq[Node] = { - - val numStatement = store.getStatementList.size - - val table = - if (numStatement > 0) { + /** Generate stats of running statements for the engine */ + private def generateStatementStatsTable( + request: HttpServletRequest, + running: Seq[SparkOperationEvent], + completed: Seq[SparkOperationEvent], + failed: Seq[SparkOperationEvent]): Seq[Node] = { + + val content = mutable.ListBuffer[Node]() + if (running.nonEmpty) { + val sqlTableTag = "running-sqlstat" + val table = + statementStatsTable(request, sqlTableTag, parent, running) + content ++= + <span id="running-sqlstat" class="collapse-aggregated-runningSqlstat collapse-table" + onClick="collapseTable('collapse-aggregated-runningSqlstat', + 'aggregated-runningSqlstat')"> + <h4> + <span class="collapse-table-arrow arrow-open"></span> + <a>Running Statement Statistics ( + {running.size} + )</a> + </h4> + </span> ++ + <div class="aggregated-runningSqlstat collapsible-table"> + {table} + </div> + } - val sqlTableTag = "sqlstat" + if (completed.nonEmpty) { + val table = { + val sqlTableTag = "completed-sqlstat" + statementStatsTable( + request, + sqlTableTag, + parent, + completed) + } - val sqlTablePage = - Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1) + content ++= + <span id="completed-sqlstat" class="collapse-aggregated-completedSqlstat collapse-table" + onClick="collapseTable('collapse-aggregated-completedSqlstat', + 'aggregated-completedSqlstat')"> + <h4> + <span class="collapse-table-arrow arrow-open"></span> + <a>Completed Statement Statistics ( + {completed.size} + )</a> + </h4> + </span> ++ + <div class="aggregated-completedSqlstat collapsible-table"> + {table} + </div> + } - try { - Some(new StatementStatsPagedTable( - request, - parent, - store.getStatementList, - "kyuubi", - UIUtils.prependBaseUri(request, parent.basePath), - sqlTableTag).table(sqlTablePage)) - } catch { - case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => - Some(<div class="alert alert-error"> - <p>Error while rendering job table:</p> - <pre> - {Utils.stringifyException(e)} - </pre> - </div>) - } - } else { - None + if (failed.nonEmpty) { + val table = { + val sqlTableTag = "failed-sqlstat" + statementStatsTable( + request, + sqlTableTag, + parent, + failed) } - val content = - <span id="sqlstat" class="collapse-aggregated-sqlstat collapse-table" - onClick="collapseTable('collapse-aggregated-sqlstat', - 'aggregated-sqlstat')"> - <h4> - <span class="collapse-table-arrow arrow-open"></span> - <a>Statement Statistics ({numStatement})</a> - </h4> - </span> ++ - <div class="aggregated-sqlstat collapsible-table"> - {table.getOrElse("No statistics have been generated yet.")} + + content ++= + <span id="failed-sqlstat" class="collapse-aggregated-failedSqlstat collapse-table" + onClick="collapseTable('collapse-aggregated-failedSqlstat', + 'aggregated-failedSqlstat')"> + <h4> + <span class="collapse-table-arrow arrow-open"></span> + <a>Failed Statement Statistics ( + {failed.size} + )</a> + </h4> + </span> ++ + <div class="aggregated-failedSqlstat collapsible-table"> + {table} </div> + } content } - /** Generate stats of sessions for the engine */ - private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = { - val numSessions = store.getSessionList.size - val table = - if (numSessions > 0) { + private def statementStatsTable( + request: HttpServletRequest, + sqlTableTag: String, + parent: EngineTab, + data: Seq[SparkOperationEvent]): Seq[Node] = { + + val sqlTablePage = + Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1) - val sessionTableTag = "sessionstat" + try { + new StatementStatsPagedTable( + request, + parent, + data, + "kyuubi", + UIUtils.prependBaseUri(request, parent.basePath), + s"${sqlTableTag}-table").table(sqlTablePage) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => + <div class="alert alert-error"> + <p>Error while rendering job table:</p> + <pre> + {Utils.stringifyException(e)} + </pre> + </div> + } + } - val sessionTablePage = - Option(request.getParameter(s"$sessionTableTag.page")).map(_.toInt).getOrElse(1) + /** Generate stats of online sessions for the engine */ + private def generateSessionStatsTable( + request: HttpServletRequest, + online: Seq[SessionEvent], + closed: Seq[SessionEvent]): Seq[Node] = { + val content = mutable.ListBuffer[Node]() + if (online.nonEmpty) { + val sessionTableTag = "online-sessionstat" + val table = sessionTable( + request, + sessionTableTag, + parent, + online) + content ++= + <span id="online-sessionstat" class="collapse-aggregated-onlineSessionstat collapse-table" + onClick="collapseTable('collapse-aggregated-onlineSessionstat', + 'aggregated-onlineSessionstat')"> + <h4> + <span class="collapse-table-arrow arrow-open"></span> + <a>Online Session Statistics ( + {online.size} + )</a> + </h4> + </span> ++ + <div class="aggregated-onlineSessionstat collapsible-table"> + {table} + </div> + } - try { - Some(new SessionStatsPagedTable( - request, - parent, - store.getSessionList, - "kyuubi", - UIUtils.prependBaseUri(request, parent.basePath), - sessionTableTag).table(sessionTablePage)) - } catch { - case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => - Some(<div class="alert alert-error"> - <p>Error while rendering job table:</p> - <pre> - {Utils.stringifyException(e)} - </pre> - </div>) - } - } else { - None + if (closed.nonEmpty) { + val table = { + val sessionTableTag = "closed-sessionstat" + sessionTable( + request, + sessionTableTag, + parent, + closed) } - val content = - <span id="sessionstat" class="collapse-aggregated-sessionstat collapse-table" - onClick="collapseTable('collapse-aggregated-sessionstat', - 'aggregated-sessionstat')"> - <h4> - <span class="collapse-table-arrow arrow-open"></span> - <a>Session Statistics ({numSessions})</a> - </h4> - </span> ++ - <div class="aggregated-sessionstat collapsible-table"> - {table.getOrElse("No statistics have been generated yet.")} + content ++= + <span id="closed-sessionstat" class="collapse-aggregated-closedSessionstat collapse-table" + onClick="collapseTable('collapse-aggregated-closedSessionstat', + 'aggregated-closedSessionstat')"> + <h4> + <span class="collapse-table-arrow arrow-open"></span> + <a>Closed Session Statistics ( + {closed.size} + )</a> + </h4> + </span> ++ + <div class="aggregated-closedSessionstat collapsible-table"> + {table} </div> - + } content } + private def sessionTable( + request: HttpServletRequest, + sessionTage: String, + parent: EngineTab, + data: Seq[SessionEvent]): Seq[Node] = { + val sessionPage = + Option(request.getParameter(s"$sessionTage.page")).map(_.toInt).getOrElse(1) + try { + new SessionStatsPagedTable( + request, + parent, + data, + "kyuubi", + UIUtils.prependBaseUri(request, parent.basePath), + s"${sessionTage}-table").table(sessionPage) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => + <div class="alert alert-error"> + <p>Error while rendering job table:</p> + <pre> + {Utils.stringifyException(e)} + </pre> + </div> + } + } + private class SessionStatsPagedTable( request: HttpServletRequest, parent: EngineTab, diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala index cdfc6d31355..46011ceae9a 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala @@ -20,6 +20,7 @@ package org.apache.spark.ui import java.util.Date import javax.servlet.http.HttpServletRequest +import scala.collection.mutable import scala.xml.Node import org.apache.spark.internal.Logging @@ -27,6 +28,8 @@ import org.apache.spark.internal.config.SECRET_REDACTION_PATTERN import org.apache.spark.ui.UIUtils._ import org.apache.spark.util.Utils +import org.apache.kyuubi.engine.spark.events.SparkOperationEvent + /** Page for Spark Web UI that shows statistics of jobs running in the engine server */ case class EngineSessionPage(parent: EngineTab) extends WebUIPage("session") with Logging { @@ -126,50 +129,118 @@ case class EngineSessionPage(parent: EngineTab) /** Generate stats of batch statements of the engine server */ private def generateSQLStatsTable(request: HttpServletRequest, sessionID: String): Seq[Node] = { - val executionList = store.getStatementList + val running = new mutable.ArrayBuffer[SparkOperationEvent]() + val completed = new mutable.ArrayBuffer[SparkOperationEvent]() + val failed = new mutable.ArrayBuffer[SparkOperationEvent]() + + store.getStatementList .filter(_.sessionId == sessionID) - val numStatement = executionList.size - val table = - if (numStatement > 0) { - - val sqlTableTag = "sqlsessionstat" - - val sqlTablePage = - Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1) - - try { - Some(new StatementStatsPagedTable( - request, - parent, - executionList, - "kyuubi/session", - UIUtils.prependBaseUri(request, parent.basePath), - sqlTableTag).table(sqlTablePage)) - } catch { - case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => - Some(<div class="alert alert-error"> - <p>Error while rendering job table:</p> - <pre> - {Utils.exceptionString(e)} - </pre> - </div>) + .foreach { op => + if (op.completeTime <= 0L) { + running += op + } else if (op.exception.isDefined) { + failed += op + } else { + completed += op } - } else { - None } - val content = - <span id="sqlsessionstat" class="collapse-aggregated-sqlsessionstat collapse-table" - onClick="collapseTable('collapse-aggregated-sqlsessionstat', - 'aggregated-sqlsessionstat')"> - <h4> - <span class="collapse-table-arrow arrow-open"></span> - <a>Statement Statistics</a> - </h4> - </span> ++ - <div class="aggregated-sqlsessionstat collapsible-table"> - {table.getOrElse("No statistics have been generated yet.")} - </div> + val content = mutable.ListBuffer[Node]() + if (running.nonEmpty) { + val sqlTableTag = "running-sqlstat" + val table = statementStatsTable(request, sqlTableTag, parent, running.toSeq) + content ++= + <span id="running-sqlstat" class="collapse-aggregated-runningSqlstat collapse-table" + onClick="collapseTable('collapse-aggregated-runningSqlstat', + 'aggregated-runningSqlstat')"> + <h4> + <span class="collapse-table-arrow arrow-open"></span> + <a>Running Statement Statistics</a> + </h4> + </span> ++ + <div class="aggregated-runningSqlstat collapsible-table"> + {table} + </div> + } + + if (completed.nonEmpty) { + val table = { + val sqlTableTag = "completed-sqlstat" + statementStatsTable( + request, + sqlTableTag, + parent, + completed.toSeq) + } + + content ++= + <span id="completed-sqlstat" class="collapse-aggregated-completedSqlstat collapse-table" + onClick="collapseTable('collapse-aggregated-completedSqlstat', + 'aggregated-completedSqlstat')"> + <h4> + <span class="collapse-table-arrow arrow-open"></span> + <a>Completed Statement Statistics ( + {completed.size} + )</a> + </h4> + </span> ++ + <div class="aggregated-completedSqlstat collapsible-table"> + {table} + </div> + } + + if (failed.nonEmpty) { + val table = { + val sqlTableTag = "failed-sqlstat" + statementStatsTable( + request, + sqlTableTag, + parent, + failed.toSeq) + } + + content ++= + <span id="failed-sqlstat" class="collapse-aggregated-failedSqlstat collapse-table" + onClick="collapseTable('collapse-aggregated-failedSqlstat', + 'aggregated-failedSqlstat')"> + <h4> + <span class="collapse-table-arrow arrow-open"></span> + <a>Failed Statement Statistics ( + {failed.size} + )</a> + </h4> + </span> ++ + <div class="aggregated-failedSqlstat collapsible-table"> + {table} + </div> + } content } + + private def statementStatsTable( + request: HttpServletRequest, + sqlTableTag: String, + parent: EngineTab, + data: Seq[SparkOperationEvent]): Seq[Node] = { + val sqlTablePage = + Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1) + + try { + new StatementStatsPagedTable( + request, + parent, + data, + "kyuubi/session", + UIUtils.prependBaseUri(request, parent.basePath), + s"${sqlTableTag}").table(sqlTablePage) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => + <div class="alert alert-error"> + <p>Error while rendering job table:</p> + <pre> + {Utils.exceptionString(e)} + </pre> + </div> + } + } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala index 260dbf87e17..ad056a0643d 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala @@ -96,10 +96,10 @@ class EngineTabSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { val resp = EntityUtils.toString(response.getEntity) // check session section - assert(resp.contains("Session Statistics")) + assert(resp.contains("Online Session Statistics")) // check session stats table id - assert(resp.contains("sessionstat")) + assert(resp.contains("onlineSessionstat")) // check session stats table title assert(resp.contains("Total Statements")) @@ -110,10 +110,11 @@ class EngineTabSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { assert(spark.sparkContext.ui.nonEmpty) val client = HttpClients.createDefault() val req = new HttpGet(spark.sparkContext.uiWebUrl.get + "/kyuubi/") - val response = client.execute(req) + var response = client.execute(req) assert(response.getStatusLine.getStatusCode === 200) - val resp = EntityUtils.toString(response.getEntity) + var resp = EntityUtils.toString(response.getEntity) assert(resp.contains("0 session(s) are online,")) + assert(!resp.contains("Statement Statistics")) withJdbcStatement() { statement => statement.execute( """ @@ -133,13 +134,23 @@ class EngineTabSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { // check session section assert(resp.contains("Statement Statistics")) + assert(!resp.contains("Failed Statement Statistics")) // check sql stats table id - assert(resp.contains("sqlstat")) + assert(resp.contains("runningSqlstat") || resp.contains("completedSqlstat")) + + assert(resp.contains("1 session(s) are online,")) // check sql stats table title assert(resp.contains("Query Details")) } + response = client.execute(req) + assert(response.getStatusLine.getStatusCode === 200) + resp = EntityUtils.toString(response.getEntity) + assert(resp.contains("0 session(s) are online,")) + assert(resp.contains("running 0 operation(s)")) + assert(resp.contains("completedSqlstat")) + assert(resp.contains("Completed Statement Statistics")) } test("statement redact for engine tab") {