Skip to content

Commit

Permalink
[SPARK-48392][CORE] Also load spark-defaults.conf when provided `--…
Browse files Browse the repository at this point in the history
…properties-file`

### What changes were proposed in this pull request?

Currently if a property file is provided as argument to Spark submit, the `spark-defaults.conf` is ignored. This PR changes the behavior such that `spark-defaults.conf` is still loaded in this scenario, and any Spark configurations that are in the file but not in the input property file will be loaded.

### Why are the changes needed?

Currently if a property file is provided as argument to Spark submit, the `spark-defaults.conf` is ignored. This causes inconveniences for users who want to split Spark configurations into the two. For example, in Spark on K8S users may want to store system wide default settings in `spark-defaults.conf`, while user specified configurations that are more dynamic in an property file and pass to Spark applications via `--properties-file` parameter. Currently this is not possible. See also kubeflow/spark-operator#1321.

### Does this PR introduce _any_ user-facing change?

Yes, now when a property file is provided via `--properties-file`, `spark-defaults.conf` will also be loaded. However, those configurations specified in the former will take precedence over the same configurations in the latter.

### How was this patch tested?

Existing tests and a new test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#46709 from sunchao/SPARK-48392.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
  • Loading branch information
sunchao committed May 29, 2024
1 parent e6236af commit eac413a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,28 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
* When this is called, `sparkProperties` is already filled with configs from the latter.
*/
private def mergeDefaultSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env))
// Honor --conf before the defaults file
// Honor --conf before the specified properties file and defaults file
defaultSparkProperties.foreach { case (k, v) =>
if (!sparkProperties.contains(k)) {
sparkProperties(k) = v
}
}

// Also load properties from `spark-defaults.conf` if they do not exist in the properties file
// and --conf list
val defaultSparkConf = Utils.getDefaultPropertiesFile(env)
Option(defaultSparkConf).foreach { filename =>
val properties = Utils.getPropertiesFromFile(filename)
properties.foreach { case (k, v) =>
if (!sparkProperties.contains(k)) {
sparkProperties(k) = v
}
}
}

if (propertiesFile == null) {
propertiesFile = defaultSparkConf
}
}

/**
Expand Down
80 changes: 52 additions & 28 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,23 @@ class SparkSubmitSuite
}
}

test("SPARK-48392: Allow both spark-defaults.conf and properties file") {
forConfDir(Map("spark.executor.memory" -> "3g")) { path =>
withPropertyFile("spark-conf.properties", Map("spark.executor.cores" -> "16")) { propsFile =>
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local",
"--properties-file", propsFile,
unusedJar.toString)
val appArgs = new SparkSubmitArguments(args, env = Map("SPARK_CONF_DIR" -> path))
appArgs.executorMemory should be("3g")
appArgs.executorCores should be("16")
}
}
}

test("support glob path") {
withTempDir { tmpJarDir =>
withTempDir { tmpFileDir =>
Expand Down Expand Up @@ -1623,6 +1640,22 @@ class SparkSubmitSuite
}
}

private def withPropertyFile(fileName: String, conf: Map[String, String])(f: String => Unit) = {
withTempDir { tmpDir =>
val props = new java.util.Properties()
val propsFile = File.createTempFile(fileName, "", tmpDir)
val propsOutputStream = new FileOutputStream(propsFile)
try {
conf.foreach { case (k, v) => props.put(k, v) }
props.store(propsOutputStream, "")
} finally {
propsOutputStream.close()
}

f(propsFile.getPath)
}
}

private def updateConfWithFakeS3Fs(conf: Configuration): Unit = {
conf.set("fs.s3a.impl", classOf[TestFileSystem].getCanonicalName)
conf.set("fs.s3a.impl.disable.cache", "true")
Expand Down Expand Up @@ -1694,40 +1727,31 @@ class SparkSubmitSuite
val infixDelimFromFile = s"${delimKey}infixDelimFromFile" -> s"${CR}blah${LF}"
val nonDelimSpaceFromFile = s"${delimKey}nonDelimSpaceFromFile" -> " blah\f"

val testProps = Seq(leadingDelimKeyFromFile, trailingDelimKeyFromFile, infixDelimFromFile,
val testProps = Map(leadingDelimKeyFromFile, trailingDelimKeyFromFile, infixDelimFromFile,
nonDelimSpaceFromFile)

val props = new java.util.Properties()
val propsFile = File.createTempFile("test-spark-conf", ".properties",
Utils.createTempDir())
val propsOutputStream = new FileOutputStream(propsFile)
try {
testProps.foreach { case (k, v) => props.put(k, v) }
props.store(propsOutputStream, "test whitespace")
} finally {
propsOutputStream.close()
}
withPropertyFile("test-spark-conf.properties", testProps) { propsFile =>
val clArgs = Seq(
"--class", "org.SomeClass",
"--conf", s"${lineFeedFromCommandLine._1}=${lineFeedFromCommandLine._2}",
"--conf", "spark.master=yarn",
"--properties-file", propsFile,
"thejar.jar")

val clArgs = Seq(
"--class", "org.SomeClass",
"--conf", s"${lineFeedFromCommandLine._1}=${lineFeedFromCommandLine._2}",
"--conf", "spark.master=yarn",
"--properties-file", propsFile.getPath,
"thejar.jar")
val appArgs = new SparkSubmitArguments(clArgs)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)

val appArgs = new SparkSubmitArguments(clArgs)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
Seq(
lineFeedFromCommandLine,
leadingDelimKeyFromFile,
trailingDelimKeyFromFile,
infixDelimFromFile
).foreach { case (k, v) =>
conf.get(k) should be (v)
}

Seq(
lineFeedFromCommandLine,
leadingDelimKeyFromFile,
trailingDelimKeyFromFile,
infixDelimFromFile
).foreach { case (k, v) =>
conf.get(k) should be (v)
conf.get(nonDelimSpaceFromFile._1) should be ("blah")
}

conf.get(nonDelimSpaceFromFile._1) should be ("blah")
}

test("get a Spark configuration from arguments") {
Expand Down
11 changes: 7 additions & 4 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,15 @@ each line consists of a key and a value separated by whitespace. For example:
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer

In addition, a property file with Spark configurations can be passed to `bin/spark-submit` via
the `--properties-file` parameter.

Any values specified as flags or in the properties file will be passed on to the application
and merged with those specified through SparkConf. Properties set directly on the SparkConf
take highest precedence, then flags passed to `spark-submit` or `spark-shell`, then options
in the `spark-defaults.conf` file. A few configuration keys have been renamed since earlier
versions of Spark; in such cases, the older key names are still accepted, but take lower
precedence than any instance of the newer key.
take the highest precedence, then those through `--conf` flags or `--properties-file` passed to
`spark-submit` or `spark-shell`, then options in the `spark-defaults.conf` file. A few
configuration keys have been renamed since earlier versions of Spark; in such cases, the older
key names are still accepted, but take lower precedence than any instance of the newer key.

Spark properties mainly can be divided into two kinds: one is related to deploy, like
"spark.driver.memory", "spark.executor.instances", this kind of properties may not be affected when
Expand Down

0 comments on commit eac413a

Please sign in to comment.