Skip to content

Commit

Permalink
[SPARK-44214][CORE] Support Spark Driver Live Log UI
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims to add `Driver Log` UI in order to provide a better UX like `Standalone Cluster`.

### Why are the changes needed?

- Since Apache Spark 3.0.0, Spark History Server supports driver logs via `spark.driver.log.persistToDfs.enabled` and `spark.driver.log.dfsDir`.
- For Apache Spark 4.0.0, this PR aims to support `Live Driver Log UI` page in Spark Driver UI (port 4040) via `spark.driver.log.localDir`. It means we can see the live driver log like the following via UI.

**SPARK-SHELL**
```
$ bin/spark-shell -c spark.driver.log.localDir=/tmp/logs
```
![Screenshot 2023-08-21 at 4 37 33 PM](https://github.com/apache/spark/assets/9700541/d08c452a-9b7d-49de-a2b2-54ad86e989cb)

**K8S**
```
bin/spark-submit \
--master k8s://$K8S_MASTER \
--deploy-mode cluster \
--name log \
-c spark.executor.instances=10 \
-c spark.driver.log.localDir=file://tmp/ \
-c spark.kubernetes.driver.master=$K8S_MASTER \
-c spark.kubernetes.namespace=$NAMESPACE \
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs with the newly added test case.

Closes apache#42542 from dongjoon-hyun/SPARK-44214.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Aug 22, 2023
1 parent 4646991 commit b137b28
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ package object config {
"Ensure that memory overhead is a double greater than 0")
.createWithDefault(0.1)

private[spark] val DRIVER_LOG_LOCAL_DIR =
ConfigBuilder("spark.driver.log.localDir")
.doc("Specifies a local directory to write driver logs and enable Driver Log UI Tab.")
.version("4.0.0")
.stringConf
.createOptional

private[spark] val DRIVER_LOG_DFS_DIR =
ConfigBuilder("spark.driver.log.dfsDir").version("3.0.0").stringConf.createOptional

Expand Down
153 changes: 153 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui

import javax.servlet.http.HttpServletRequest

import scala.xml.{Node, Unparsed}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR
import org.apache.spark.util.Utils
import org.apache.spark.util.logging.DriverLogger.DRIVER_LOG_FILE
import org.apache.spark.util.logging.RollingFileAppender

/**
* Live Spark Driver Log UI Page.
*
* This is similar with Spark worker's LogPage class.
*/
private[ui] class DriverLogPage(
parent: DriverLogTab,
conf: SparkConf)
extends WebUIPage("") with Logging {
require(conf.get(DRIVER_LOG_LOCAL_DIR).nonEmpty, s"Please specify ${DRIVER_LOG_LOCAL_DIR.key}.")

private val supportedLogTypes = Set(DRIVER_LOG_FILE, "stderr", "stdout")
private val defaultBytes = 100 * 1024
private val logDir = conf.get(DRIVER_LOG_LOCAL_DIR).get

def render(request: HttpServletRequest): Seq[Node] = {
val logType = Option(request.getParameter("logType")).getOrElse(DRIVER_LOG_FILE)
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt)
.getOrElse(defaultBytes)
val (logText, startByte, endByte, logLength) = getLog(logDir, logType, offset, byteLength)
val curLogLength = endByte - startByte
val range =
<span id="log-data">
Showing {curLogLength} Bytes: {startByte.toString} - {endByte.toString} of {logLength}
</span>

val moreButton =
<button type="button" onclick={"loadMore()"} class="log-more-btn btn btn-secondary">
Load More
</button>

val newButton =
<button type="button" onclick={"loadNew()"} class="log-new-btn btn btn-secondary">
Load New
</button>

val alert =
<div class="no-new-alert alert alert-info" style="display: none;">
End of Log
</div>

val logParams = "/?logType=%s".format(logType)
val jsOnload = "window.onload = " +
s"initLogPage('$logParams', $curLogLength, $startByte, $endByte, $logLength, $byteLength);"

val content =
<script src={UIUtils.prependBaseUri(request, "/static/utils.js")}></script> ++
<div>
Logs at {logDir}
{range}
<div class="log-content" style="height:80vh; overflow:auto; padding:5px;">
<div>{moreButton}</div>
<pre>{logText}</pre>
{alert}
<div>{newButton}</div>
</div>
<script>{Unparsed(jsOnload)}</script>
</div>

UIUtils.headerSparkPage(request, "Logs", content, parent)
}

def renderLog(request: HttpServletRequest): String = {
val logType = Option(request.getParameter("logType")).getOrElse(DRIVER_LOG_FILE)
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt)
.getOrElse(defaultBytes)

val (logText, startByte, endByte, logLength) = getLog(logDir, logType, offset, byteLength)
val pre = s"==== Bytes $startByte-$endByte of $logLength of $logDir$logType ====\n"
pre + logText
}

/** Get the part of the log files given the offset and desired length of bytes */
private def getLog(
logDirectory: String,
logType: String,
offsetOption: Option[Long],
byteLength: Int
): (String, Long, Long, Long) = {

if (!supportedLogTypes.contains(logType)) {
return ("Error: Log type must be one of " + supportedLogTypes.mkString(", "), 0, 0, 0)
}

try {
val files = RollingFileAppender.getSortedRolledOverFiles(logDirectory, logType)
logDebug(s"Sorted log files of type $logType in $logDirectory:\n${files.mkString("\n")}")

val fileLengths: Seq[Long] = files.map(Utils.getFileLength(_, conf))
val totalLength = fileLengths.sum
val offset = offsetOption.getOrElse(totalLength - byteLength)
val startIndex = {
if (offset < 0) {
0L
} else if (offset > totalLength) {
totalLength
} else {
offset
}
}
val endIndex = math.min(startIndex + byteLength, totalLength)
logDebug(s"Getting log from $startIndex to $endIndex")
val logText = Utils.offsetBytes(files, fileLengths, startIndex, endIndex)
logDebug(s"Got log of length ${logText.length} bytes")
(logText, startIndex, endIndex, totalLength)
} catch {
case e: Exception =>
logError(s"Error getting $logType logs from directory $logDirectory", e)
("Error getting logs due to exception: " + e.getMessage, 0, 0, 0)
}
}
}

/**
* Live Spark Driver Log UI Tab.
*/
private[ui] class DriverLogTab(parent: SparkUI) extends SparkUITab(parent, "logs") {
private val page = new DriverLogPage(this, parent.conf)
attachPage(page)

def getPage: DriverLogPage = page
}
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR
import org.apache.spark.internal.config.UI._
import org.apache.spark.scheduler._
import org.apache.spark.status.AppStatusStore
Expand Down Expand Up @@ -100,6 +101,13 @@ private[spark] class SparkUI private (
attachTab(stagesTab)
attachTab(new StorageTab(this, store))
attachTab(new EnvironmentTab(this, store))
if (sc.map(_.conf.get(DRIVER_LOG_LOCAL_DIR).nonEmpty).getOrElse(false)) {
val driverLogTab = new DriverLogTab(this)
attachTab(driverLogTab)
attachHandler(createServletHandler("/log",
(request: HttpServletRequest) => driverLogTab.getPage.renderLog(request),
sc.get.conf))
}
attachTab(new ExecutorsTab(this))
addStaticHandler(SparkUI.STATIC_RESOURCE_DIR)
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ private[spark] class DriverLogger(conf: SparkConf) extends Logging {
private val DEFAULT_LAYOUT = "%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex"
private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)

private val localLogFile: String = FileUtils.getFile(
private val localLogFile: String = conf.get(DRIVER_LOG_LOCAL_DIR).map {
FileUtils.getFile(_, DriverLogger.DRIVER_LOG_FILE).getAbsolutePath()
}.getOrElse(FileUtils.getFile(
Utils.getLocalDir(conf),
DriverLogger.DRIVER_LOG_DIR,
DriverLogger.DRIVER_LOG_FILE).getAbsolutePath()
DriverLogger.DRIVER_LOG_FILE).getAbsolutePath())
private var writer: Option[DfsAsyncWriter] = None

addLogAppender()
Expand Down Expand Up @@ -210,7 +212,9 @@ private[spark] object DriverLogger extends Logging {
val APPENDER_NAME = "_DriverLogAppender"

def apply(conf: SparkConf): Option[DriverLogger] = {
if (conf.get(DRIVER_LOG_PERSISTTODFS) && Utils.isClientMode(conf)) {
val localDriverLogEnabled = conf.get(DRIVER_LOG_LOCAL_DIR).nonEmpty
if (conf.get(DRIVER_LOG_PERSISTTODFS) && Utils.isClientMode(conf)
|| localDriverLogEnabled) {
if (conf.contains(DRIVER_LOG_DFS_DIR)) {
try {
Some(new DriverLogger(conf))
Expand All @@ -219,6 +223,11 @@ private[spark] object DriverLogger extends Logging {
logError("Could not add driver logger", e)
None
}
} else if (localDriverLogEnabled) {
// Driver Logger is started only for Spark Driver Log UI Tab
new DriverLogger(conf)
// Return None because we don't need DFS-related logic in SparkContext and DfsAsyncWriter
None
} else {
logWarning(s"Driver logs are not persisted because" +
s" ${DRIVER_LOG_DFS_DIR.key} is not configured")
Expand Down
65 changes: 65 additions & 0 deletions core/src/test/scala/org/apache/spark/ui/DriverLogPageSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ui

import java.io.File
import java.nio.charset.StandardCharsets
import javax.servlet.http.HttpServletRequest

import org.apache.commons.io.FileUtils
import org.mockito.Mockito.{mock, when}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR


class DriverLogPageSuite extends SparkFunSuite {

test("DriverLogTab requires driver log location") {
val sparkUI = mock(classOf[SparkUI])
when(sparkUI.conf).thenReturn(new SparkConf(false))

val m = intercept[IllegalArgumentException] {
new DriverLogTab(sparkUI)
}.getMessage
assert(m.contains(s"Please specify ${DRIVER_LOG_LOCAL_DIR.key}"))

}

test("DriverLogPage requires driver log location") {
val conf = new SparkConf(false)
val m = intercept[IllegalArgumentException] {
new DriverLogPage(null, conf)
}.getMessage
assert(m.contains(s"Please specify ${DRIVER_LOG_LOCAL_DIR.key}"))
}

test("renderLog reads driver.log file") {
val conf = new SparkConf(false)
withTempDir { dir =>
val page = new DriverLogPage(null, conf.set(DRIVER_LOG_LOCAL_DIR, dir.getCanonicalPath))
val file = new File(dir, "driver.log")
FileUtils.writeStringToFile(file, "driver log content", StandardCharsets.UTF_8)
val request = mock(classOf[HttpServletRequest])
val log = page.renderLog(request)
assert(log.startsWith("==== Bytes 0-18 of 18 of"))
assert(log.contains("driver.log"))
assert(log.contains("driver log content"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext {
assert(dfsFileStatus.getLen > 0)
}

test("SPARK-44214: DriverLogger.apply returns None when only spark.driver.log.localDir exists") {
val sparkConf = new SparkConf()
assert(DriverLogger(sparkConf).isEmpty)
assert(DriverLogger(sparkConf.set(DRIVER_LOG_LOCAL_DIR, "file://tmp/")).isEmpty)
}

private def getSparkContext(): SparkContext = {
getSparkContext(new SparkConf())
}
Expand Down

0 comments on commit b137b28

Please sign in to comment.