Skip to content

Commit

Permalink
[SPARK-47579][CORE][PART2] Migrate logInfo with variables to structur…
Browse files Browse the repository at this point in the history
…ed logging framework

The PR aims to migrate `logInfo` in Core module with variables to structured logging framework.

### Why are the changes needed?

To enhance Apache Spark's logging system by implementing structured logging.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46494 from zeotuan/coreInfo2.

Lead-authored-by: Tuan Pham <[email protected]>
Co-authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
zeotuan and gengliangwang committed May 14, 2024
1 parent 91da2ca commit 79aeae1
Show file tree
Hide file tree
Showing 36 changed files with 260 additions and 163 deletions.
33 changes: 33 additions & 0 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ object LogKeys {
case object CLASS_LOADER extends LogKey
case object CLASS_NAME extends LogKey
case object CLASS_PATH extends LogKey
case object CLASS_PATHS extends LogKey
case object CLAUSES extends LogKey
case object CLEANUP_LOCAL_DIRS extends LogKey
case object CLUSTER_CENTROIDS extends LogKey
Expand Down Expand Up @@ -122,6 +123,7 @@ object LogKeys {
case object COST extends LogKey
case object COUNT extends LogKey
case object CREATED_POOL_NAME extends LogKey
case object CREDENTIALS_RENEWAL_INTERVAL_RATIO extends LogKey
case object CROSS_VALIDATION_METRIC extends LogKey
case object CROSS_VALIDATION_METRICS extends LogKey
case object CSV_HEADER_COLUMN_NAME extends LogKey
Expand Down Expand Up @@ -215,6 +217,7 @@ object LogKeys {
case object FALLBACK_VERSION extends LogKey
case object FEATURE_COLUMN extends LogKey
case object FEATURE_DIMENSION extends LogKey
case object FETCH_SIZE extends LogKey
case object FIELD_NAME extends LogKey
case object FILE_ABSOLUTE_PATH extends LogKey
case object FILE_END_OFFSET extends LogKey
Expand All @@ -226,6 +229,7 @@ object LogKeys {
case object FILE_NAME2 extends LogKey
case object FILE_NAME3 extends LogKey
case object FILE_START_OFFSET extends LogKey
case object FILE_SYSTEM extends LogKey
case object FILE_VERSION extends LogKey
case object FILTER extends LogKey
case object FILTERS extends LogKey
Expand Down Expand Up @@ -253,6 +257,7 @@ object LogKeys {
case object HIVE_OPERATION_STATE extends LogKey
case object HIVE_OPERATION_TYPE extends LogKey
case object HOST extends LogKey
case object HOST_NAMES extends LogKey
case object HOST_PORT extends LogKey
case object HOST_PORT2 extends LogKey
case object HUGE_METHOD_LIMIT extends LogKey
Expand All @@ -274,7 +279,9 @@ object LogKeys {
case object IS_NETWORK_REQUEST_DONE extends LogKey
case object JAR_ENTRY extends LogKey
case object JAR_MESSAGE extends LogKey
case object JAR_URL extends LogKey
case object JAVA_VERSION extends LogKey
case object JAVA_VM_NAME extends LogKey
case object JOB_ID extends LogKey
case object JOIN_CONDITION extends LogKey
case object JOIN_CONDITION_SUB_EXPR extends LogKey
Expand All @@ -283,6 +290,7 @@ object LogKeys {
case object KAFKA_PULLS_COUNT extends LogKey
case object KAFKA_RECORDS_PULLED_COUNT extends LogKey
case object KEY extends LogKey
case object KEY2 extends LogKey
case object KEYTAB extends LogKey
case object KEYTAB_FILE extends LogKey
case object LABEL_COLUMN extends LogKey
Expand Down Expand Up @@ -360,18 +368,23 @@ object LogKeys {
case object NEW_RDD_ID extends LogKey
case object NEW_STATE extends LogKey
case object NEW_VALUE extends LogKey
case object NEXT_RENEWAL_TIME extends LogKey
case object NODES extends LogKey
case object NODE_LOCATION extends LogKey
case object NON_BUILT_IN_CONNECTORS extends LogKey
case object NORM extends LogKey
case object NUM_ADDED_MASTERS extends LogKey
case object NUM_ADDED_PARTITIONS extends LogKey
case object NUM_ADDED_WORKERS extends LogKey
case object NUM_APPS extends LogKey
case object NUM_BIN extends LogKey
case object NUM_BLOCK_IDS extends LogKey
case object NUM_BROADCAST_BLOCK extends LogKey
case object NUM_BYTES extends LogKey
case object NUM_BYTES_CURRENT extends LogKey
case object NUM_BYTES_EVICTED extends LogKey
case object NUM_BYTES_MAX extends LogKey
case object NUM_BYTES_TO_FREE extends LogKey
case object NUM_BYTES_TO_WARN extends LogKey
case object NUM_BYTES_USED extends LogKey
case object NUM_CHUNKS extends LogKey
Expand All @@ -380,6 +393,7 @@ object LogKeys {
case object NUM_CONCURRENT_WRITER extends LogKey
case object NUM_CORES extends LogKey
case object NUM_DATA_FILES extends LogKey
case object NUM_DRIVERS extends LogKey
case object NUM_DROPPED_PARTITIONS extends LogKey
case object NUM_ELEMENTS_SPILL_THRESHOLD extends LogKey
case object NUM_EVENTS extends LogKey
Expand All @@ -400,17 +414,20 @@ object LogKeys {
case object NUM_LOADED_ENTRIES extends LogKey
case object NUM_LOCAL_DIRS extends LogKey
case object NUM_LOCAL_FREQUENT_PATTERN extends LogKey
case object NUM_MERGER_LOCATIONS extends LogKey
case object NUM_META_FILES extends LogKey
case object NUM_PARTITIONS extends LogKey
case object NUM_PARTITIONS2 extends LogKey
case object NUM_PARTITION_VALUES extends LogKey
case object NUM_PATHS extends LogKey
case object NUM_PEERS extends LogKey
case object NUM_PEERS_REPLICATED_TO extends LogKey
case object NUM_PEERS_TO_REPLICATE_TO extends LogKey
case object NUM_PENDING_LAUNCH_TASKS extends LogKey
case object NUM_POINT extends LogKey
case object NUM_PREFIXES extends LogKey
case object NUM_PRUNED extends LogKey
case object NUM_REMOVED_WORKERS extends LogKey
case object NUM_REPLICAS extends LogKey
case object NUM_REQUESTS extends LogKey
case object NUM_REQUEST_SYNC_TASK extends LogKey
Expand Down Expand Up @@ -477,6 +494,7 @@ object LogKeys {
case object POST_SCAN_FILTERS extends LogKey
case object PREDICATE extends LogKey
case object PREDICATES extends LogKey
case object PREFIX extends LogKey
case object PRETTY_ID_STRING extends LogKey
case object PRINCIPAL extends LogKey
case object PROCESS extends LogKey
Expand All @@ -488,6 +506,7 @@ object LogKeys {
case object PUSHED_FILTERS extends LogKey
case object PVC_METADATA_NAME extends LogKey
case object PYTHON_EXEC extends LogKey
case object PYTHON_PACKAGES extends LogKey
case object PYTHON_VERSION extends LogKey
case object PYTHON_WORKER_MODULE extends LogKey
case object PYTHON_WORKER_RESPONSE extends LogKey
Expand All @@ -512,6 +531,7 @@ object LogKeys {
case object RECEIVER_ID extends LogKey
case object RECEIVER_IDS extends LogKey
case object RECORDS extends LogKey
case object RECOVERY_STATE extends LogKey
case object REDACTED_STATEMENT extends LogKey
case object REDUCE_ID extends LogKey
case object REGISTERED_EXECUTOR_FILE extends LogKey
Expand Down Expand Up @@ -541,12 +561,14 @@ object LogKeys {
case object RMSE extends LogKey
case object ROCKS_DB_LOG_LEVEL extends LogKey
case object ROCKS_DB_LOG_MESSAGE extends LogKey
case object RPC_ADDRESS extends LogKey
case object RPC_ENDPOINT_REF extends LogKey
case object RPC_MESSAGE_CAPACITY extends LogKey
case object RULE_BATCH_NAME extends LogKey
case object RULE_NAME extends LogKey
case object RULE_NUMBER_OF_RUNS extends LogKey
case object RUN_ID extends LogKey
case object SCALA_VERSION extends LogKey
case object SCHEDULER_POOL_NAME extends LogKey
case object SCHEMA extends LogKey
case object SCHEMA2 extends LogKey
Expand All @@ -561,6 +583,8 @@ object LogKeys {
case object SET_CLIENT_INFO_REQUEST extends LogKey
case object SHARD_ID extends LogKey
case object SHELL_COMMAND extends LogKey
case object SHELL_OPTIONS extends LogKey
case object SHORT_USER_NAME extends LogKey
case object SHUFFLE_BLOCK_INFO extends LogKey
case object SHUFFLE_DB_BACKEND_KEY extends LogKey
case object SHUFFLE_DB_BACKEND_NAME extends LogKey
Expand All @@ -578,8 +602,13 @@ object LogKeys {
case object SNAPSHOT_VERSION extends LogKey
case object SOCKET_ADDRESS extends LogKey
case object SOURCE_PATH extends LogKey
case object SPARK_BRANCH extends LogKey
case object SPARK_BUILD_DATE extends LogKey
case object SPARK_BUILD_USER extends LogKey
case object SPARK_DATA_STREAM extends LogKey
case object SPARK_PLAN_ID extends LogKey
case object SPARK_REPO_URL extends LogKey
case object SPARK_REVISION extends LogKey
case object SPARK_VERSION extends LogKey
case object SPILL_TIMES extends LogKey
case object SQL_TEXT extends LogKey
Expand Down Expand Up @@ -646,6 +675,7 @@ object LogKeys {
case object TOKEN extends LogKey
case object TOKEN_KIND extends LogKey
case object TOKEN_REGEX extends LogKey
case object TOKEN_RENEWER extends LogKey
case object TOPIC extends LogKey
case object TOPIC_PARTITION extends LogKey
case object TOPIC_PARTITIONS extends LogKey
Expand All @@ -667,12 +697,15 @@ object LogKeys {
case object UI_FILTER extends LogKey
case object UI_FILTER_PARAMS extends LogKey
case object UI_PROXY_BASE extends LogKey
case object UNKNOWN_PARAM extends LogKey
case object UNSUPPORTED_EXPR extends LogKey
case object UNSUPPORTED_HINT_REASON extends LogKey
case object UNTIL_OFFSET extends LogKey
case object UPPER_BOUND extends LogKey
case object URI extends LogKey
case object URIS extends LogKey
case object URL extends LogKey
case object URL2 extends LogKey
case object USER_ID extends LogKey
case object USER_NAME extends LogKey
case object UUID extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.sys.process.Process
import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{PATH, PYTHON_VERSION}
import org.apache.spark.internal.LogKeys.{PATH, PYTHON_PACKAGES, PYTHON_VERSION}
import org.apache.spark.util.ArrayImplicits.SparkArrayOps
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -147,7 +147,8 @@ private[spark] object PythonUtils extends Logging {
def formatOutput(output: String): String = {
output.replaceAll("\\s+", ", ")
}
listOfPackages.foreach(x => logInfo(s"List of Python packages :- ${formatOutput(x)}"))
listOfPackages.foreach(x => logInfo(log"List of Python packages :-" +
log" ${MDC(PYTHON_PACKAGES, formatOutput(x))}"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.util.Random

import org.apache.spark._
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.LogKeys.BROADCAST_ID
import org.apache.spark.internal.LogKeys._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage._
Expand Down Expand Up @@ -278,11 +278,12 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO
}
case None =>
val estimatedTotalSize = Utils.bytesToString(numBlocks.toLong * blockSize)
logInfo(s"Started reading broadcast variable $id with $numBlocks pieces " +
s"(estimated total size $estimatedTotalSize)")
logInfo(log"Started reading broadcast variable ${MDC(BROADCAST_ID, id)} with ${MDC(NUM_BROADCAST_BLOCK, numBlocks)} pieces " +
log"(estimated total size ${MDC(NUM_BYTES, estimatedTotalSize)})")
val startTimeNs = System.nanoTime()
val blocks = readBlocks()
logInfo(s"Reading broadcast variable $id took ${Utils.getUsedTimeNs(startTimeNs)}")
logInfo(log"Reading broadcast variable ${MDC(BROADCAST_ID, id)}" +
log" took ${MDC(TOTAL_TIME, Utils.getUsedTimeNs(startTimeNs))}")

try {
val obj = TorrentBroadcast.unBlockifyObject[T](
Expand Down
35 changes: 20 additions & 15 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,14 @@ private[spark] class SparkSubmit extends Logging {
/___/ .__/\_,_/_/ /_/\_\ version %s
/_/
""".format(SPARK_VERSION))
logInfo("Using Scala %s, %s, %s".format(
Properties.versionString, Properties.javaVmName, Properties.javaVersion))
logInfo(s"Branch $SPARK_BRANCH")
logInfo(s"Compiled by user $SPARK_BUILD_USER on $SPARK_BUILD_DATE")
logInfo(s"Revision $SPARK_REVISION")
logInfo(s"Url $SPARK_REPO_URL")
logInfo(log"Using Scala ${MDC(LogKeys.SCALA_VERSION, Properties.versionString)}," +
log" ${MDC(LogKeys.JAVA_VM_NAME, Properties.javaVmName)}," +
log" ${MDC(LogKeys.JAVA_VERSION, Properties.javaVersion)}")
logInfo(log"Branch ${MDC(LogKeys.SPARK_BRANCH, SPARK_BRANCH)}")
logInfo(log"Compiled by user ${MDC(LogKeys.SPARK_BUILD_USER, SPARK_BUILD_USER)} on" +
log" ${MDC(LogKeys.SPARK_BUILD_DATE, SPARK_BUILD_DATE)}")
logInfo(log"Revision ${MDC(LogKeys.SPARK_REVISION, SPARK_REVISION)}")
logInfo(log"Url ${MDC(LogKeys.SPARK_REPO_URL, SPARK_REPO_URL)}")
logInfo("Type --help for more information.")
}

Expand Down Expand Up @@ -438,7 +440,9 @@ private[spark] class SparkSubmit extends Logging {
workingDirectory,
if (resolvedUri.getFragment != null) resolvedUri.getFragment else source.getName)
.getCanonicalFile
logInfo(s"Files $resolvedUri from $source to $dest")
logInfo(log"Files ${MDC(LogKeys.URI, resolvedUri)}" +
log" from ${MDC(LogKeys.SOURCE_PATH, source)}" +
log" to ${MDC(LogKeys.DESTINATION_PATH, dest)}")
Utils.deleteRecursively(dest)
if (isArchive) {
Utils.unpack(source, dest)
Expand Down Expand Up @@ -921,7 +925,7 @@ private[spark] class SparkSubmit extends Logging {
private def setRMPrincipal(sparkConf: SparkConf): Unit = {
val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
logInfo(s"Setting ${key} to ${shortUserName}")
logInfo(log"Setting ${MDC(LogKeys.KEY, key)} to ${MDC(LogKeys.SHORT_USER_NAME, shortUserName)}")
sparkConf.set(key, shortUserName)
}

Expand Down Expand Up @@ -958,11 +962,12 @@ private[spark] class SparkSubmit extends Logging {
}

if (args.verbose) {
logInfo(s"Main class:\n$childMainClass")
logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
logInfo(log"Main class:\n${MDC(LogKeys.CLASS_NAME, childMainClass)}")
logInfo(log"Arguments:\n${MDC(LogKeys.ARGS, childArgs.mkString("\n"))}")
// sysProps may contain sensitive information, so redact before printing
logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).sorted.mkString("\n")}")
logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
logInfo(log"Spark config:\n" +
log"${MDC(LogKeys.CONFIG, Utils.redact(sparkConf.getAll.toMap).sorted.mkString("\n"))}")
logInfo(log"Classpath elements:\n${MDC(LogKeys.CLASS_PATHS, childClasspath.mkString("\n"))}")
logInfo("\n")
}
assert(!(args.deployMode == "cluster" && args.proxyUser != null && childClasspath.nonEmpty) ||
Expand All @@ -982,18 +987,18 @@ private[spark] class SparkSubmit extends Logging {
case e: ClassNotFoundException =>
logError(log"Failed to load class ${MDC(LogKeys.CLASS_NAME, childMainClass)}.")
if (childMainClass.contains("thriftserver")) {
logInfo(s"Failed to load main class $childMainClass.")
logInfo(log"Failed to load main class ${MDC(LogKeys.CLASS_NAME, childMainClass)}.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
} else if (childMainClass.contains("org.apache.spark.sql.connect")) {
logInfo(s"Failed to load main class $childMainClass.")
logInfo(log"Failed to load main class ${MDC(LogKeys.CLASS_NAME, childMainClass)}.")
// TODO(SPARK-42375): Should point out the user-facing page here instead.
logInfo("You need to specify Spark Connect jars with --jars or --packages.")
}
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
logError(log"Failed to load ${MDC(LogKeys.CLASS_NAME, childMainClass)}", e)
if (e.getMessage.contains("org/apache/hadoop/hive")) {
logInfo(s"Failed to load hive class.")
logInfo("Failed to load hive class.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
}
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var principal: String = null
var keytab: String = null
private var dynamicAllocationEnabled: Boolean = false

// Standalone cluster mode only
var supervise: Boolean = false
var driverCores: String = null
Expand All @@ -90,7 +89,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
if (verbose) {
logInfo(s"Using properties file: $propertiesFile")
logInfo(log"Using properties file: ${MDC(PATH, propertiesFile)}")
}
Option(propertiesFile).foreach { filename =>
val properties = Utils.getPropertiesFromFile(filename)
Expand All @@ -100,7 +99,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
// Property files may contain sensitive information, so redact before printing
if (verbose) {
Utils.redact(properties).foreach { case (k, v) =>
logInfo(s"Adding default property: $k=$v")
logInfo(log"Adding default property: ${MDC(KEY, k)}=${MDC(VALUE, v)}")
}
}
}
Expand Down Expand Up @@ -490,7 +489,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S

private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
if (unknownParam != null) {
logInfo("Unknown/unsupported param " + unknownParam)
logInfo(log"Unknown/unsupported param ${MDC(UNKNOWN_PARAM, unknownParam)}")
}
val command = sys.env.getOrElse("_SPARK_CMD_USAGE",
"""Usage: spark-submit [options] <app jar | python file | R file> [app arguments]
Expand Down Expand Up @@ -589,11 +588,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
)

if (SparkSubmit.isSqlShell(mainClass)) {
logInfo("CLI options:")
logInfo(getSqlShellOptions())
logInfo(log"CLI options:\n${MDC(SHELL_OPTIONS, getSqlShellOptions())}")
}

throw new SparkUserAppException(exitCode)
throw SparkUserAppException(exitCode)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,19 @@ private[history] class ApplicationCache(
*/
@throws[NoSuchElementException]
private def loadApplicationEntry(appId: String, attemptId: Option[String]): CacheEntry = {
lazy val application = s"$appId/${attemptId.mkString}"
logDebug(s"Loading application Entry $application")
lazy val application = log"${MDC(APP_ID, appId)}/${MDC(APP_ATTEMPT_ID, attemptId.mkString)}"
logDebug(log"Loading application Entry " + application)
metrics.loadCount.inc()
val loadedUI = time(metrics.loadTimer) {
metrics.lookupCount.inc()
operations.getAppUI(appId, attemptId) match {
case Some(loadedUI) =>
logDebug(s"Loaded application $application")
logDebug(log"Loaded application " + application)
loadedUI
case None =>
metrics.lookupFailureCount.inc()
// guava's cache logs via java.util log, so is of limited use. Hence: our own message
logInfo(s"Failed to load application attempt $application")
logInfo(log"Failed to load application attempt " + application)
throw new NoSuchElementException(s"no application with application Id '$appId'" +
attemptId.map { id => s" attemptId '$id'" }.getOrElse(" and no attempt Id"))
}
Expand Down
Loading

0 comments on commit 79aeae1

Please sign in to comment.