diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala index 2e33d8ce6db..6b5ea92fce5 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala @@ -113,4 +113,29 @@ object KyuubiSparkUtil extends Logging { SparkSQLEngine.kyuubiConf.get(configEntry) } } + + def formatDurationNano(nanoseconds: Long): String = { + formatDuration(nanoseconds / 1000000) + } + + def formatDuration(milliseconds: Long): String = { + if (milliseconds < 100) { + return "%d ms".format(milliseconds) + } + val seconds = milliseconds.toDouble / 1000 + if (seconds < 1) { + return "%.1f s".format(seconds) + } + if (seconds < 60) { + return "%.0f s".format(seconds) + } + val minutes = seconds / 60 + if (minutes < 10) { + return "%.1f min".format(minutes) + } else if (minutes < 60) { + return "%.0f min".format(minutes) + } + val hours = minutes / 60 + "%.1f h".format(hours) + } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala index 2d6649d25e1..610b1645c2f 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala @@ -47,7 +47,9 @@ case class SessionEvent( conf: Map[String, String], startTime: Long, var endTime: Long = -1L, - var totalOperations: Int = 0) extends KyuubiEvent with SparkListenerEvent { + var totalOperations: Int = 0, + var sessionRunTime: Long = 0, + var sessionCpuTime: Long = 0) extends KyuubiEvent with SparkListenerEvent { override lazy val partitions: Seq[(String, String)] = ("day", Utils.getDateFromTimestamp(startTime)) :: Nil diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala index 319e1c09c4f..4b529cdafb9 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala @@ -59,7 +59,9 @@ case class SparkOperationEvent( exception: Option[Throwable], sessionId: String, sessionUser: String, - executionId: Option[Long]) extends KyuubiEvent with SparkListenerEvent { + executionId: Option[Long], + operationRunTime: Option[Long], + operationCpuTime: Option[Long]) extends KyuubiEvent with SparkListenerEvent { override def partitions: Seq[(String, String)] = ("day", Utils.getDateFromTimestamp(createTime)) :: Nil @@ -79,7 +81,9 @@ case class SparkOperationEvent( object SparkOperationEvent { def apply( operation: SparkOperation, - executionId: Option[Long] = None): SparkOperationEvent = { + executionId: Option[Long] = None, + operationRunTime: Option[Long] = None, + operationCpuTime: Option[Long] = None): SparkOperationEvent = { val session = operation.getSession val status = operation.getStatus new SparkOperationEvent( @@ -94,6 +98,8 @@ object SparkOperationEvent { status.exception, session.handle.identifier.toString, session.user, - executionId) + executionId, + operationRunTime, + operationCpuTime) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 88ebc306b66..b92dadba79b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -30,6 +30,7 @@ import org.apache.kyuubi.{KyuubiSQLException, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING, ENGINE_SPARK_OUTPUT_MODE, EngineSparkOutputMode, OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, SESSION_USER_SIGN_ENABLED} import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_KEY, KYUUBI_SESSION_USER_SIGN, KYUUBI_STATEMENT_ID_KEY} +import org.apache.kyuubi.engine.spark.KyuubiSparkUtil import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SCHEDULER_POOL_KEY} import org.apache.kyuubi.engine.spark.events.SparkOperationEvent import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY @@ -124,7 +125,20 @@ abstract class SparkOperation(session: Session) override protected def setState(newState: OperationState): Unit = { super.setState(newState) if (eventEnabled) { - EventBus.post(SparkOperationEvent(this, operationListener.flatMap(_.getExecutionId))) + EventBus.post(SparkOperationEvent( + this, + operationListener.flatMap(_.getExecutionId), + operationListener.map(_.operationRunTime.get()), + operationListener.map(_.operationCpuTime.get()))) + if (OperationState.isTerminal(newState)) { + operationListener.foreach(l => { + info(s"statementId=${statementId}, " + + s"operationRunTime=${KyuubiSparkUtil.formatDuration(l.operationRunTime.get())}, " + + s"operationCpuTime=${KyuubiSparkUtil.formatDurationNano(l.operationCpuTime.get())}") + session.asInstanceOf[SparkSessionImpl].increaseRunTime(l.operationRunTime.get()) + session.asInstanceOf[SparkSessionImpl].increaseCpuTime(l.operationCpuTime.get()) + }) + } } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala index 08bd09b4483..17d9dc719f5 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala @@ -17,11 +17,14 @@ package org.apache.kyuubi.engine.spark.session +import java.util.concurrent.atomic.AtomicLong + import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY +import org.apache.kyuubi.engine.spark.KyuubiSparkUtil import org.apache.kyuubi.engine.spark.events.SessionEvent import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager import org.apache.kyuubi.engine.spark.udf.KDFRegistry @@ -110,7 +113,12 @@ class SparkSessionImpl( } override def close(): Unit = { + info(s"sessionId=${sessionEvent.sessionId}, " + + s"sessionRunTime=${KyuubiSparkUtil.formatDuration(sessionRunTime.get())}, " + + s"sessionCpuTime=${KyuubiSparkUtil.formatDurationNano(sessionCpuTime.get())}") sessionEvent.endTime = System.currentTimeMillis() + sessionEvent.sessionRunTime = sessionRunTime.get() + sessionEvent.sessionCpuTime = sessionCpuTime.get() EventBus.post(sessionEvent) super.close() spark.sessionState.catalog.getTempViewNames().foreach(spark.catalog.uncacheTable) @@ -118,4 +126,15 @@ class SparkSessionImpl( sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closePythonProcess( handle) } + + val sessionRunTime = new AtomicLong(0) + val sessionCpuTime = new AtomicLong(0) + + def increaseRunTime(time: Long): Unit = { + sessionRunTime.getAndAdd(time) + } + + def increaseCpuTime(time: Long): Unit = { + sessionCpuTime.getAndAdd(time) + } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index a7d409c7ca5..5a7187ec585 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -19,6 +19,7 @@ package org.apache.spark.kyuubi import java.util.Properties import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ @@ -29,6 +30,7 @@ import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd import org.apache.kyuubi.Logging import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_SHOW_PROGRESS, ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL} import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY +import org.apache.kyuubi.engine.spark.KyuubiSparkUtil import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SQL_EXECUTION_ID_KEY} import org.apache.kyuubi.engine.spark.operation.ExecuteStatement import org.apache.kyuubi.operation.Operation @@ -61,6 +63,9 @@ class SQLOperationListener( None } + val operationRunTime = new AtomicLong(0) + val operationCpuTime = new AtomicLong(0) + def getExecutionId: Option[Long] = executionId // For broadcast, Spark will introduce a new runId as SPARK_JOB_GROUP_ID, see: @@ -140,6 +145,14 @@ class SQLOperationListener( val stageInfo = stageCompleted.stageInfo val stageId = stageInfo.stageId val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber()) + val taskMetrics = stageInfo.taskMetrics + if (taskMetrics != null) { + info(s"stageId=${stageCompleted.stageInfo.stageId}, " + + s"stageRunTime=${KyuubiSparkUtil.formatDuration(taskMetrics.executorRunTime)}, " + + s"stageCpuTime=${KyuubiSparkUtil.formatDurationNano(taskMetrics.executorCpuTime)}") + operationRunTime.getAndAdd(taskMetrics.executorRunTime) + operationCpuTime.getAndAdd(taskMetrics.executorCpuTime) + } activeStages.synchronized { if (activeStages.remove(stageAttempt) != null) { stageInfo.getStatusString match {