Skip to content

Commit

Permalink
[KYUUBI #6107] [Spark] Collect and summarize the executorRunTime an…
Browse files Browse the repository at this point in the history
…d `executorCpuTime` of the statement

# 🔍 Description
## Issue References 🔗

This pull request fixes #6107

## Describe Your Solution 🔧

The total execution time of a statement (or a session) is the summary of the execution time of the stages belonging to the statement (or session).
The total execution time of a stage is collected from `SQLOperationListener#onStageCompleted`.
The total execution times of the statement or a session are stored in the engine events or output to the log.

<img width="962" alt="截屏2024-02-29 14 47 50" src="https://github.com/apache/kyuubi/assets/23011702/176df1db-bb20-428b-94b8-fa02c946fde2">
<img width="1143" alt="截屏2024-02-29 14 47 21" src="https://github.com/apache/kyuubi/assets/23011702/8cfc6a72-f6e8-45b6-bdda-30296c94c893">

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6112 from XorSum/features/spark-engine-cpu-time-collect.

Closes #6107

8028005 [bkhan] check same group
d9efa2d [bkhan] formatDuration
a8841cd [bkhan] update
2507159 [bkhan] Apply suggestions from code review
cfed2b9 [bkhan] use formatDurationVerbose
444d4aa [bkhan] Collect and summarize the executorRunTime and executorCpuTime of the statement

Authored-by: bkhan <[email protected]>
Signed-off-by: Shaoyun Chen <[email protected]>
  • Loading branch information
XorSum authored and cxzl25 committed Mar 4, 2024
1 parent 9f53a09 commit 3bc28fd
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ case class SessionEvent(
conf: Map[String, String],
startTime: Long,
var endTime: Long = -1L,
var totalOperations: Int = 0) extends KyuubiEvent with SparkListenerEvent {
var totalOperations: Int = 0,
var sessionRunTime: Long = 0,
var sessionCpuTime: Long = 0) extends KyuubiEvent with SparkListenerEvent {

override lazy val partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(startTime)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ case class SparkOperationEvent(
exception: Option[Throwable],
sessionId: String,
sessionUser: String,
executionId: Option[Long]) extends KyuubiEvent with SparkListenerEvent {
executionId: Option[Long],
operationRunTime: Option[Long],
operationCpuTime: Option[Long]) extends KyuubiEvent with SparkListenerEvent {

override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
Expand All @@ -79,7 +81,9 @@ case class SparkOperationEvent(
object SparkOperationEvent {
def apply(
operation: SparkOperation,
executionId: Option[Long] = None): SparkOperationEvent = {
executionId: Option[Long] = None,
operationRunTime: Option[Long] = None,
operationCpuTime: Option[Long] = None): SparkOperationEvent = {
val session = operation.getSession
val status = operation.getStatus
new SparkOperationEvent(
Expand All @@ -94,6 +98,8 @@ object SparkOperationEvent {
status.exception,
session.handle.identifier.toString,
session.user,
executionId)
executionId,
operationRunTime,
operationCpuTime)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.kyuubi.SparkUtilsHelper.redact
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
import org.apache.spark.ui.SparkUIUtilsHelper.formatDuration

import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
Expand Down Expand Up @@ -124,7 +125,20 @@ abstract class SparkOperation(session: Session)
override protected def setState(newState: OperationState): Unit = {
super.setState(newState)
if (eventEnabled) {
EventBus.post(SparkOperationEvent(this, operationListener.flatMap(_.getExecutionId)))
EventBus.post(SparkOperationEvent(
this,
operationListener.flatMap(_.getExecutionId),
operationListener.map(_.getOperationRunTime),
operationListener.map(_.getOperationCpuTime)))
if (OperationState.isTerminal(newState)) {
operationListener.foreach(l => {
info(s"statementId=${statementId}, " +
s"operationRunTime=${formatDuration(l.getOperationRunTime)}, " +
s"operationCpuTime=${formatDuration(l.getOperationCpuTime / 1000000)}")
session.asInstanceOf[SparkSessionImpl].increaseRunTime(l.getOperationRunTime)
session.asInstanceOf[SparkSessionImpl].increaseCpuTime(l.getOperationCpuTime)
})
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.kyuubi.engine.spark.session

import java.util.concurrent.atomic.AtomicLong

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.ui.SparkUIUtilsHelper.formatDuration

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
Expand All @@ -43,6 +46,8 @@ class SparkSessionImpl(

override val handle: SessionHandle =
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
private val sessionRunTime = new AtomicLong(0)
private val sessionCpuTime = new AtomicLong(0)

private def setModifiableConfig(key: String, value: String): Unit = {
try {
Expand Down Expand Up @@ -110,12 +115,25 @@ class SparkSessionImpl(
}

override def close(): Unit = {
info(s"sessionId=${sessionEvent.sessionId}, " +
s"sessionRunTime=${formatDuration(sessionRunTime.get())}, " +
s"sessionCpuTime=${formatDuration(sessionCpuTime.get() / 1000000)}")
sessionEvent.endTime = System.currentTimeMillis()
sessionEvent.sessionRunTime = sessionRunTime.get()
sessionEvent.sessionCpuTime = sessionCpuTime.get()
EventBus.post(sessionEvent)
super.close()
spark.sessionState.catalog.getTempViewNames().foreach(spark.catalog.uncacheTable)
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closeILoop(handle)
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closePythonProcess(
handle)
}

def increaseRunTime(time: Long): Unit = {
sessionRunTime.getAndAdd(time)
}

def increaseCpuTime(time: Long): Unit = {
sessionCpuTime.getAndAdd(time)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package org.apache.spark.kyuubi

import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._

import org.apache.spark.scheduler._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
import org.apache.spark.ui.UIUtils.formatDuration

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_SHOW_PROGRESS, ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL}
Expand Down Expand Up @@ -61,6 +63,13 @@ class SQLOperationListener(
None
}

private val operationRunTime = new AtomicLong(0)
private val operationCpuTime = new AtomicLong(0)

def getOperationRunTime: Long = operationRunTime.get()

def getOperationCpuTime: Long = operationCpuTime.get()

def getExecutionId: Option[Long] = executionId

// For broadcast, Spark will introduce a new runId as SPARK_JOB_GROUP_ID, see:
Expand Down Expand Up @@ -150,6 +159,14 @@ class SQLOperationListener(
}
}
}
val taskMetrics = stageInfo.taskMetrics
if (taskMetrics != null) {
info(s"stageId=${stageCompleted.stageInfo.stageId}, " +
s"stageRunTime=${formatDuration(taskMetrics.executorRunTime)}, " +
s"stageCpuTime=${formatDuration(taskMetrics.executorCpuTime / 1000000)}")
operationRunTime.getAndAdd(taskMetrics.executorRunTime)
operationCpuTime.getAndAdd(taskMetrics.executorCpuTime)
}
withOperationLog(super.onStageCompleted(stageCompleted))
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ui

/**
* A place to invoke non-public APIs of [[UIUtils]], anything to be added here need to
* think twice
*/
object SparkUIUtilsHelper {

def formatDuration(ms: Long): String = {
UIUtils.formatDuration(ms)
}
}

0 comments on commit 3bc28fd

Please sign in to comment.