Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support rolling spark.kubernetes.file.upload.path #6876

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.kubernetes.namespace | default | The namespace that will be used for running the kyuubi pods and find engines. | string | 1.7.0 |
| kyuubi.kubernetes.namespace.allow.list || The allowed kubernetes namespace list, if it is empty, there is no kubernetes namespace limitation. | set | 1.8.0 |
| kyuubi.kubernetes.spark.appUrlPattern | http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}} | The pattern to generate the spark on kubernetes application UI URL. The pattern should contain placeholders for the application variables. Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, `{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`. | string | 1.10.0 |
| kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled | false | If enabled, Kyuubi server will try to create the `spark.kubernetes.file.upload.path` with permission 777 before submitting the Spark application. | boolean | 1.11.0 |
| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval | PT1M | Kyuubi server use guava cache as the cleanup trigger with time-based eviction, but the eviction would not happened until any get/put operation happened. This option schedule a daemon thread evict cache periodically. | duration | 1.8.1 |
| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind | NONE | Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are NONE, ALL, COMPLETED and default value is None which means none of the pod will be deleted | string | 1.8.1 |
| kyuubi.kubernetes.spark.forciblyRewriteDriverPodName.enabled | false | Whether to forcibly rewrite Spark driver pod name with 'kyuubi-<uuid>-driver'. If disabled, Kyuubi will try to preserve the application name while satisfying K8s' pod name policy, but some vendors may have stricter pod name policies, thus the generated name may become illegal. | boolean | 1.8.1 |
Expand Down
27 changes: 27 additions & 0 deletions docs/deployment/engine_on_kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,33 @@ The minimum required configurations are:
* spark.kubernetes.file.upload.path (path on S3 or HDFS)
* spark.kubernetes.authenticate.driver.serviceAccountName ([viz ServiceAccount](#serviceaccount))

The vanilla Spark neither support rolling nor expiration mechanism for `spark.kubernetes.file.upload.path`, if you use
file system that does not support TTL, e.g. HDFS, additional cleanup mechanisms are needed to prevent the files in this
directory from growing indefinitely. Since Kyuubi v1.11.0, you can configure `spark.kubernetes.file.upload.path` with
placeholders `{{YEAR}}`, `{{MONTH}}` and `{{DAY}}`, and enable `kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled`
to let Kyuubi server create the directory with 777 permission automatically before submitting Spark application.

Comment on lines +51 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that our current implementation does not solve the problem of file growth. Will this issue be solved in subsequent PRs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It adds the rolling support for spark.kubernetes.file.upload.path, for example,

spark.kubernetes.file.upload.path=hdfs://hadoop-testing/spark-upload-{{YEAR}}{{MONTH}}
hdfs://hadoop-testing/spark-upload-202412
hdfs://hadoop-testing/spark-upload-202501

Admin can safely delete the hdfs://hadoop-testing/spark-upload-202412 after 20250101

Note that, Spark would create sub dir `s"spark-upload-${UUID.randomUUID()}"` under the `spark.kubernetes.file.upload.path`
for each uploading, the administer still needs to clean up the staging directory periodically.

For example, the user can configure the below configurations in `kyuubi-defaults.conf` to enable monthly rolling support
for `spark.kubernetes.file.upload.path`

```
kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled=true
spark.kubernetes.file.upload.path=hdfs://hadoop-cluster/spark-upload-{{YEAR}}{{MONTH}}
```

and the staging files would be like

```
hdfs://hadoop-cluster/spark-upload-202412/spark-upload-f2b71340-dc1d-4940-89e2-c5fc31614eb4
hdfs://hadoop-cluster/spark-upload-202412/spark-upload-173a8653-4d3e-48c0-b8ab-b7f92ae582d6
hdfs://hadoop-cluster/spark-upload-202501/spark-upload-3b22710f-a4a0-40bb-a3a8-16e481038a63
```

then the administer can safely delete the `hdfs://hadoop-cluster/spark-upload-202412` after 20250101.

### Docker Image

Spark ships a `./bin/docker-image-tool.sh` script to build and publish the Docker images for running Spark applications on Kubernetes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,15 @@ object KyuubiConf {
.createWithDefault(
"http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}}")

val KUBERNETES_SPARK_AUTO_CREATE_FILE_UPLOAD_PATH: ConfigEntry[Boolean] =
buildConf("kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled")
.doc("If enabled, Kyuubi server will try to create the " +
"`spark.kubernetes.file.upload.path` with permission 777 before submitting " +
"the Spark application.")
.version("1.11.0")
.booleanConf
.createWithDefault(false)

object KubernetesCleanupDriverPodStrategy extends Enumeration {
type KubernetesCleanupDriverPodStrategy = Value
val NONE, ALL, COMPLETED = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class SparkBatchProcessBuilder(
(batchKyuubiConf.getAll ++
sparkAppNameConf() ++
engineLogPathConf() ++
appendPodNameConf(batchConf)).map { case (k, v) =>
appendPodNameConf(batchConf) ++
prepareK8sFileUploadPath()).map { case (k, v) =>
buffer ++= confKeyValue(convertConfigKey(k), v)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package org.apache.kyuubi.engine.spark

import java.io.{File, FileFilter, IOException}
import java.nio.file.Paths
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.Locale

import scala.collection.mutable

import com.google.common.annotations.VisibleForTesting
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi._
Expand All @@ -37,7 +41,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.{JavaUtils, KubernetesUtils, Validator}
import org.apache.kyuubi.util.{JavaUtils, KubernetesUtils, KyuubiHadoopUtils, Validator}
import org.apache.kyuubi.util.command.CommandLineUtils._

class SparkProcessBuilder(
Expand Down Expand Up @@ -141,7 +145,11 @@ class SparkProcessBuilder(
allConf = allConf ++ zkAuthKeytabFileConf(allConf)
}
// pass spark engine log path to spark conf
(allConf ++ engineLogPathConf ++ extraYarnConf(allConf) ++ appendPodNameConf(allConf)).foreach {
(allConf ++
engineLogPathConf ++
extraYarnConf(allConf) ++
appendPodNameConf(allConf) ++
prepareK8sFileUploadPath()).foreach {
case (k, v) => buffer ++= confKeyValue(convertConfigKey(k), v)
}

Expand Down Expand Up @@ -266,6 +274,43 @@ class SparkProcessBuilder(
map.result().toMap
}

def prepareK8sFileUploadPath(): Map[String, String] = {
kubernetesFileUploadPath() match {
case Some(uploadPathPattern) if isK8sClusterMode =>
val today = LocalDate.now()
val uploadPath = uploadPathPattern
.replace("{{YEAR}}", today.format(YEAR_FMT))
.replace("{{MONTH}}", today.format(MONTH_FMT))
.replace("{{DAY}}", today.format(DAY_FMT))

if (conf.get(KUBERNETES_SPARK_AUTO_CREATE_FILE_UPLOAD_PATH)) {
// Create the `uploadPath` using permission 777, otherwise, spark just creates the
// `$uploadPath/spark-upload-$uuid` using default permission 511, which might prevent
// other users from creating the staging dir under `uploadPath` later.
val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf, loadDefaults = false)
val path = new Path(uploadPath)
var fs: FileSystem = null
try {
fs = path.getFileSystem(hadoopConf)
if (!fs.exists(path)) {
info(s"Try creating $KUBERNETES_FILE_UPLOAD_PATH: $uploadPath")
fs.mkdirs(path, KUBERNETES_UPLOAD_PATH_PERMISSION)
}
} catch {
case ioe: IOException =>
warn(s"Failed to create $KUBERNETES_FILE_UPLOAD_PATH: $uploadPath", ioe)
} finally {
if (fs != null) {
Utils.tryLogNonFatalError(fs.close())
}
}
}
Map(KUBERNETES_FILE_UPLOAD_PATH -> uploadPath)
turboFei marked this conversation as resolved.
Show resolved Hide resolved
case _ =>
Map.empty
}
}

def extraYarnConf(conf: Map[String, String]): Map[String, String] = {
val map = mutable.Map.newBuilder[String, String]
if (clusterManager().exists(_.toLowerCase(Locale.ROOT).startsWith("yarn"))) {
Expand Down Expand Up @@ -294,6 +339,11 @@ class SparkProcessBuilder(
}
}

def isK8sClusterMode: Boolean = {
pan3793 marked this conversation as resolved.
Show resolved Hide resolved
clusterManager().exists(cm => cm.toLowerCase(Locale.ROOT).startsWith("k8s")) &&
deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster")
}

def kubernetesContext(): Option[String] = {
conf.getOption(KUBERNETES_CONTEXT_KEY).orElse(defaultsConf.get(KUBERNETES_CONTEXT_KEY))
}
Expand All @@ -302,6 +352,11 @@ class SparkProcessBuilder(
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
}

def kubernetesFileUploadPath(): Option[String] = {
conf.getOption(KUBERNETES_FILE_UPLOAD_PATH)
.orElse(defaultsConf.get(KUBERNETES_FILE_UPLOAD_PATH))
}

override def validateConf(): Unit = Validator.validateConf(conf)

// For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user
Expand Down Expand Up @@ -331,6 +386,13 @@ object SparkProcessBuilder {
final val YARN_MAX_APP_ATTEMPTS_KEY = "spark.yarn.maxAppAttempts"
final val INTERNAL_RESOURCE = "spark-internal"

final val KUBERNETES_FILE_UPLOAD_PATH = "spark.kubernetes.file.upload.path"
final val KUBERNETES_UPLOAD_PATH_PERMISSION = new FsPermission(Integer.parseInt("777", 8).toShort)

final val YEAR_FMT = DateTimeFormatter.ofPattern("yyyy")
final val MONTH_FMT = DateTimeFormatter.ofPattern("MM")
final val DAY_FMT = DateTimeFormatter.ofPattern("dd")

/**
* The path configs from Spark project that might upload local files:
* - SparkSubmit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.engine.spark

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.UUID

import org.apache.kyuubi.KyuubiFunSuite
Expand All @@ -36,4 +38,24 @@ class SparkBatchProcessBuilderSuite extends KyuubiFunSuite {
None)
assert(builder.commands.toSeq.contains("spark.kyuubi.key=value"))
}

test("spark.kubernetes.file.upload.path supports placeholder") {
val conf1 = KyuubiConf(false)
conf1.set("spark.master", "k8s://test:12345")
conf1.set("spark.submit.deployMode", "cluster")
conf1.set("spark.kubernetes.file.upload.path", "hdfs:///spark-upload-{{YEAR}}{{MONTH}}{{DAY}}")
val builder1 = new SparkBatchProcessBuilder(
"",
conf1,
UUID.randomUUID().toString,
"test",
Some("test"),
"test",
Map("kyuubi.key" -> "value"),
Seq.empty,
None)
val commands1 = builder1.toString.split(' ')
val toady = DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now())
assert(commands1.contains(s"spark.kubernetes.file.upload.path=hdfs:///spark-upload-$toady"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.kyuubi.engine.spark

import java.io.File
import java.nio.file.{Files, Path, Paths, StandardOpenOption}
import java.time.Duration
import java.time.{Duration, LocalDate}
import java.time.format.DateTimeFormatter
import java.util.UUID
import java.util.concurrent.{Executors, TimeUnit}

Expand Down Expand Up @@ -468,6 +469,17 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
None)
assert(builder.commands.toSeq.contains("spark.kyuubi.key=value"))
}

test("spark.kubernetes.file.upload.path supports placeholder") {
val conf1 = KyuubiConf(false)
conf1.set("spark.master", "k8s://test:12345")
conf1.set("spark.submit.deployMode", "cluster")
conf1.set("spark.kubernetes.file.upload.path", "hdfs:///spark-upload-{{YEAR}}{{MONTH}}{{DAY}}")
val builder1 = new SparkProcessBuilder("", true, conf1)
val commands1 = builder1.toString.split(' ')
val toady = DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now())
assert(commands1.contains(s"spark.kubernetes.file.upload.path=hdfs:///spark-upload-$toady"))
}
}

class FakeSparkProcessBuilder(config: KyuubiConf)
Expand Down
Loading