Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6107] [Spark] Collect and summarize the executorRunTime and executorCpuTime of the statement #6112

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,29 @@ object KyuubiSparkUtil extends Logging {
SparkSQLEngine.kyuubiConf.get(configEntry)
}
}

def formatDurationNano(nanoseconds: Long): String = {
formatDuration(nanoseconds / 1000000)
}

def formatDuration(milliseconds: Long): String = {
if (milliseconds < 100) {
return "%d ms".format(milliseconds)
}
val seconds = milliseconds.toDouble / 1000
if (seconds < 1) {
return "%.1f s".format(seconds)
}
if (seconds < 60) {
return "%.0f s".format(seconds)
}
val minutes = seconds / 60
if (minutes < 10) {
return "%.1f min".format(minutes)
} else if (minutes < 60) {
return "%.0f min".format(minutes)
}
val hours = minutes / 60
"%.1f h".format(hours)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ case class SessionEvent(
conf: Map[String, String],
startTime: Long,
var endTime: Long = -1L,
var totalOperations: Int = 0) extends KyuubiEvent with SparkListenerEvent {
var totalOperations: Int = 0,
var sessionRunTime: Long = 0,
wForget marked this conversation as resolved.
Show resolved Hide resolved
var sessionCpuTime: Long = 0) extends KyuubiEvent with SparkListenerEvent {

override lazy val partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(startTime)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ case class SparkOperationEvent(
exception: Option[Throwable],
sessionId: String,
sessionUser: String,
executionId: Option[Long]) extends KyuubiEvent with SparkListenerEvent {
executionId: Option[Long],
operationRunTime: Option[Long],
wForget marked this conversation as resolved.
Show resolved Hide resolved
operationCpuTime: Option[Long]) extends KyuubiEvent with SparkListenerEvent {

override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
Expand All @@ -79,7 +81,9 @@ case class SparkOperationEvent(
object SparkOperationEvent {
def apply(
operation: SparkOperation,
executionId: Option[Long] = None): SparkOperationEvent = {
executionId: Option[Long] = None,
operationRunTime: Option[Long] = None,
operationCpuTime: Option[Long] = None): SparkOperationEvent = {
val session = operation.getSession
val status = operation.getStatus
new SparkOperationEvent(
Expand All @@ -94,6 +98,8 @@ object SparkOperationEvent {
status.exception,
session.handle.identifier.toString,
session.user,
executionId)
executionId,
operationRunTime,
operationCpuTime)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING, ENGINE_SPARK_OUTPUT_MODE, EngineSparkOutputMode, OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, SESSION_USER_SIGN_ENABLED}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_KEY, KYUUBI_SESSION_USER_SIGN, KYUUBI_STATEMENT_ID_KEY}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SCHEDULER_POOL_KEY}
import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY
Expand Down Expand Up @@ -124,7 +125,20 @@ abstract class SparkOperation(session: Session)
override protected def setState(newState: OperationState): Unit = {
super.setState(newState)
if (eventEnabled) {
EventBus.post(SparkOperationEvent(this, operationListener.flatMap(_.getExecutionId)))
EventBus.post(SparkOperationEvent(
this,
operationListener.flatMap(_.getExecutionId),
operationListener.map(_.operationRunTime.get()),
operationListener.map(_.operationCpuTime.get())))
if (OperationState.isTerminal(newState)) {
operationListener.foreach(l => {
info(s"statementId=${statementId}, " +
s"operationRunTime=${KyuubiSparkUtil.formatDuration(l.operationRunTime.get())}, " +
s"operationCpuTime=${KyuubiSparkUtil.formatDurationNano(l.operationCpuTime.get())}")
cxzl25 marked this conversation as resolved.
Show resolved Hide resolved
session.asInstanceOf[SparkSessionImpl].increaseRunTime(l.operationRunTime.get())
session.asInstanceOf[SparkSessionImpl].increaseCpuTime(l.operationCpuTime.get())
})
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package org.apache.kyuubi.engine.spark.session

import java.util.concurrent.atomic.AtomicLong

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{AnalysisException, SparkSession}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.engine.spark.events.SessionEvent
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.engine.spark.udf.KDFRegistry
Expand Down Expand Up @@ -110,12 +113,28 @@ class SparkSessionImpl(
}

override def close(): Unit = {
info(s"sessionId=${sessionEvent.sessionId}, " +
s"sessionRunTime=${KyuubiSparkUtil.formatDuration(sessionRunTime.get())}, " +
s"sessionCpuTime=${KyuubiSparkUtil.formatDurationNano(sessionCpuTime.get())}")
cxzl25 marked this conversation as resolved.
Show resolved Hide resolved
sessionEvent.endTime = System.currentTimeMillis()
sessionEvent.sessionRunTime = sessionRunTime.get()
sessionEvent.sessionCpuTime = sessionCpuTime.get()
EventBus.post(sessionEvent)
super.close()
spark.sessionState.catalog.getTempViewNames().foreach(spark.catalog.uncacheTable)
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closeILoop(handle)
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closePythonProcess(
handle)
}

val sessionRunTime = new AtomicLong(0)
val sessionCpuTime = new AtomicLong(0)
XorSum marked this conversation as resolved.
Show resolved Hide resolved

def increaseRunTime(time: Long): Unit = {
sessionRunTime.getAndAdd(time)
}

def increaseCpuTime(time: Long): Unit = {
sessionCpuTime.getAndAdd(time)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.kyuubi

import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._

Expand All @@ -29,6 +30,7 @@ import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_SHOW_PROGRESS, ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SQL_EXECUTION_ID_KEY}
import org.apache.kyuubi.engine.spark.operation.ExecuteStatement
import org.apache.kyuubi.operation.Operation
Expand Down Expand Up @@ -61,6 +63,9 @@ class SQLOperationListener(
None
}

val operationRunTime = new AtomicLong(0)
val operationCpuTime = new AtomicLong(0)
XorSum marked this conversation as resolved.
Show resolved Hide resolved

def getExecutionId: Option[Long] = executionId

// For broadcast, Spark will introduce a new runId as SPARK_JOB_GROUP_ID, see:
Expand Down Expand Up @@ -140,6 +145,14 @@ class SQLOperationListener(
val stageInfo = stageCompleted.stageInfo
val stageId = stageInfo.stageId
val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
val taskMetrics = stageInfo.taskMetrics
cxzl25 marked this conversation as resolved.
Show resolved Hide resolved
if (taskMetrics != null) {
info(s"stageId=${stageCompleted.stageInfo.stageId}, " +
s"stageRunTime=${KyuubiSparkUtil.formatDuration(taskMetrics.executorRunTime)}, " +
s"stageCpuTime=${KyuubiSparkUtil.formatDurationNano(taskMetrics.executorCpuTime)}")
XorSum marked this conversation as resolved.
Show resolved Hide resolved
operationRunTime.getAndAdd(taskMetrics.executorRunTime)
operationCpuTime.getAndAdd(taskMetrics.executorCpuTime)
}
activeStages.synchronized {
if (activeStages.remove(stageAttempt) != null) {
stageInfo.getStatusString match {
Expand Down
Loading