Skip to content

Commit

Permalink
Collect and summarize the executorRunTime and executorCpuTime of the …
Browse files Browse the repository at this point in the history
…statement
  • Loading branch information
XorSum committed Feb 28, 2024
1 parent 84e60e9 commit 444d4aa
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,29 @@ object KyuubiSparkUtil extends Logging {
SparkSQLEngine.kyuubiConf.get(configEntry)
}
}

def formatDurationNano(nanoseconds: Long): String = {
formatDuration(nanoseconds / 1000000)
}

def formatDuration(milliseconds: Long): String = {
if (milliseconds < 100) {
return "%d ms".format(milliseconds)
}
val seconds = milliseconds.toDouble / 1000
if (seconds < 1) {
return "%.1f s".format(seconds)
}
if (seconds < 60) {
return "%.0f s".format(seconds)
}
val minutes = seconds / 60
if (minutes < 10) {
return "%.1f min".format(minutes)
} else if (minutes < 60) {
return "%.0f min".format(minutes)
}
val hours = minutes / 60
"%.1f h".format(hours)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ case class SessionEvent(
conf: Map[String, String],
startTime: Long,
var endTime: Long = -1L,
var totalOperations: Int = 0) extends KyuubiEvent with SparkListenerEvent {
var totalOperations: Int = 0,
var sessionRunTime: Long = 0,
var sessionCpuTime: Long = 0) extends KyuubiEvent with SparkListenerEvent {

override lazy val partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(startTime)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ case class SparkOperationEvent(
exception: Option[Throwable],
sessionId: String,
sessionUser: String,
executionId: Option[Long]) extends KyuubiEvent with SparkListenerEvent {
executionId: Option[Long],
operationRunTime: Option[Long],
operationCpuTime: Option[Long]) extends KyuubiEvent with SparkListenerEvent {

override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
Expand All @@ -79,7 +81,9 @@ case class SparkOperationEvent(
object SparkOperationEvent {
def apply(
operation: SparkOperation,
executionId: Option[Long] = None): SparkOperationEvent = {
executionId: Option[Long] = None,
operationRunTime: Option[Long] = None,
operationCpuTime: Option[Long] = None): SparkOperationEvent = {
val session = operation.getSession
val status = operation.getStatus
new SparkOperationEvent(
Expand All @@ -94,6 +98,8 @@ object SparkOperationEvent {
status.exception,
session.handle.identifier.toString,
session.user,
executionId)
executionId,
operationRunTime,
operationCpuTime)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING, ENGINE_SPARK_OUTPUT_MODE, EngineSparkOutputMode, OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, SESSION_USER_SIGN_ENABLED}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_KEY, KYUUBI_SESSION_USER_SIGN, KYUUBI_STATEMENT_ID_KEY}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SCHEDULER_POOL_KEY}
import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY
Expand Down Expand Up @@ -124,7 +125,20 @@ abstract class SparkOperation(session: Session)
override protected def setState(newState: OperationState): Unit = {
super.setState(newState)
if (eventEnabled) {
EventBus.post(SparkOperationEvent(this, operationListener.flatMap(_.getExecutionId)))
EventBus.post(SparkOperationEvent(
this,
operationListener.flatMap(_.getExecutionId),
operationListener.map(_.operationRunTime.get()),
operationListener.map(_.operationCpuTime.get())))
if (OperationState.isTerminal(newState)) {
operationListener.foreach(l => {
info(s"statementId=${statementId}, " +
s"operationRunTime=${KyuubiSparkUtil.formatDuration(l.operationRunTime.get())}, " +
s"operationCpuTime=${KyuubiSparkUtil.formatDurationNano(l.operationCpuTime.get())}")
session.asInstanceOf[SparkSessionImpl].increaseRunTime(l.operationRunTime.get())
session.asInstanceOf[SparkSessionImpl].increaseCpuTime(l.operationCpuTime.get())
})
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package org.apache.kyuubi.engine.spark.session

import java.util.concurrent.atomic.AtomicLong

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{AnalysisException, SparkSession}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.engine.spark.events.SessionEvent
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.engine.spark.udf.KDFRegistry
Expand Down Expand Up @@ -110,12 +113,28 @@ class SparkSessionImpl(
}

override def close(): Unit = {
info(s"sessionId=${sessionEvent.sessionId}, " +
s"sessionRunTime=${KyuubiSparkUtil.formatDuration(sessionRunTime.get())}, " +
s"sessionCpuTime=${KyuubiSparkUtil.formatDurationNano(sessionCpuTime.get())}")
sessionEvent.endTime = System.currentTimeMillis()
sessionEvent.sessionRunTime = sessionRunTime.get()
sessionEvent.sessionCpuTime = sessionCpuTime.get()
EventBus.post(sessionEvent)
super.close()
spark.sessionState.catalog.getTempViewNames().foreach(spark.catalog.uncacheTable)
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closeILoop(handle)
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closePythonProcess(
handle)
}

val sessionRunTime = new AtomicLong(0)
val sessionCpuTime = new AtomicLong(0)

def increaseRunTime(time: Long): Unit = {
sessionRunTime.getAndAdd(time)
}

def increaseCpuTime(time: Long): Unit = {
sessionCpuTime.getAndAdd(time)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.kyuubi

import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._

Expand All @@ -29,6 +30,7 @@ import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_SHOW_PROGRESS, ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SQL_EXECUTION_ID_KEY}
import org.apache.kyuubi.engine.spark.operation.ExecuteStatement
import org.apache.kyuubi.operation.Operation
Expand Down Expand Up @@ -61,6 +63,9 @@ class SQLOperationListener(
None
}

val operationRunTime = new AtomicLong(0)
val operationCpuTime = new AtomicLong(0)

def getExecutionId: Option[Long] = executionId

// For broadcast, Spark will introduce a new runId as SPARK_JOB_GROUP_ID, see:
Expand Down Expand Up @@ -140,6 +145,14 @@ class SQLOperationListener(
val stageInfo = stageCompleted.stageInfo
val stageId = stageInfo.stageId
val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
val taskMetrics = stageInfo.taskMetrics
if (taskMetrics != null) {
info(s"stageId=${stageCompleted.stageInfo.stageId}, " +
s"stageRunTime=${KyuubiSparkUtil.formatDuration(taskMetrics.executorRunTime)}, " +
s"stageCpuTime=${KyuubiSparkUtil.formatDurationNano(taskMetrics.executorCpuTime)}")
operationRunTime.getAndAdd(taskMetrics.executorRunTime)
operationCpuTime.getAndAdd(taskMetrics.executorCpuTime)
}
activeStages.synchronized {
if (activeStages.remove(stageAttempt) != null) {
stageInfo.getStatusString match {
Expand Down

0 comments on commit 444d4aa

Please sign in to comment.