From 38953dc3f38940b2cb8869d31329902468635e1f Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 30 Dec 2024 14:21:08 +0800 Subject: [PATCH 1/6] Support rolling spark.kubernetes.file.upload.path --- docs/deployment/engine_on_kubernetes.md | 6 ++ .../org/apache/kyuubi/config/KyuubiConf.scala | 9 +++ .../spark/SparkBatchProcessBuilder.scala | 3 +- .../engine/spark/SparkProcessBuilder.scala | 67 ++++++++++++++++++- .../spark/SparkBatchProcessBuilderSuite.scala | 22 ++++++ .../spark/SparkProcessBuilderSuite.scala | 14 +++- 6 files changed, 117 insertions(+), 4 deletions(-) diff --git a/docs/deployment/engine_on_kubernetes.md b/docs/deployment/engine_on_kubernetes.md index 7d94286bde7..2d46cda7cb0 100644 --- a/docs/deployment/engine_on_kubernetes.md +++ b/docs/deployment/engine_on_kubernetes.md @@ -48,6 +48,12 @@ The minimum required configurations are: * spark.kubernetes.file.upload.path (path on S3 or HDFS) * spark.kubernetes.authenticate.driver.serviceAccountName ([viz ServiceAccount](#serviceaccount)) +The vanilla Spark neither support rolling nor expiration mechanism for `spark.kubernetes.file.upload.path`, if you use +file system that does not support TTL, e.g. HDFS, additional cleanup mechanisms are needed to prevent the files in this +directory from growing indefinitely. Since Kyuubi v1.11.0, you can configure `spark.kubernetes.file.upload.path` with +placeholders `{{YEAR}}`, `{{MONTH}}` and `{{DAY}}`, and enable `kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled` +to let Kyuubi server create the directory with 777 permission automatically before submitting Spark application. + ### Docker Image Spark ships a `./bin/docker-image-tool.sh` script to build and publish the Docker images for running Spark applications on Kubernetes. diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index a493d7c4578..d27df47c021 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1334,6 +1334,15 @@ object KyuubiConf { .createWithDefault( "http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}}") + val KUBERNETES_SPARK_AUTO_CREATE_FILE_UPLOAD_PATH: ConfigEntry[Boolean] = + buildConf("kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled") + .doc("If enabled, Kyuubi server will try to create the " + + "`spark.kubernetes.file.upload.path` with permission 777 before submitting " + + "the Spark application.") + .version("1.11.0") + .booleanConf + .createWithDefault(false) + object KubernetesCleanupDriverPodStrategy extends Enumeration { type KubernetesCleanupDriverPodStrategy = Value val NONE, ALL, COMPLETED = Value diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala index 713a34d0c87..11b4e03dde7 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala @@ -56,7 +56,8 @@ class SparkBatchProcessBuilder( (batchKyuubiConf.getAll ++ sparkAppNameConf() ++ engineLogPathConf() ++ - appendPodNameConf(batchConf)).map { case (k, v) => + appendPodNameConf(batchConf) ++ + prepareK8sFileUploadPath()).map { case (k, v) => buffer ++= confKeyValue(convertConfigKey(k), v) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index aacdddef32b..b0bf9fd4e36 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -19,12 +19,17 @@ package org.apache.kyuubi.engine.spark import java.io.{File, FileFilter, IOException} import java.nio.file.Paths +import java.time.LocalDate +import java.time.format.DateTimeFormatter import java.util.Locale import scala.collection.mutable import com.google.common.annotations.VisibleForTesting import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.security.UserGroupInformation import org.apache.kyuubi._ @@ -37,7 +42,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE import org.apache.kyuubi.ha.client.AuthTypes import org.apache.kyuubi.operation.log.OperationLog -import org.apache.kyuubi.util.{JavaUtils, KubernetesUtils, Validator} +import org.apache.kyuubi.util.{JavaUtils, KubernetesUtils, KyuubiHadoopUtils, Validator} import org.apache.kyuubi.util.command.CommandLineUtils._ class SparkProcessBuilder( @@ -141,7 +146,11 @@ class SparkProcessBuilder( allConf = allConf ++ zkAuthKeytabFileConf(allConf) } // pass spark engine log path to spark conf - (allConf ++ engineLogPathConf ++ extraYarnConf(allConf) ++ appendPodNameConf(allConf)).foreach { + (allConf ++ + engineLogPathConf ++ + extraYarnConf(allConf) ++ + appendPodNameConf(allConf) ++ + prepareK8sFileUploadPath()).foreach { case (k, v) => buffer ++= confKeyValue(convertConfigKey(k), v) } @@ -266,6 +275,40 @@ class SparkProcessBuilder( map.result().toMap } + def prepareK8sFileUploadPath(): Map[String, String] = { + kubernetesFileUploadPath() match { + case Some(uploadPathPattern) if isK8sClusterMode => + val today = LocalDate.now() + val uploadPath = uploadPathPattern + .replace("{{YEAR}}", today.format(YEAR_FMT)) + .replace("{{MONTH}}", today.format(MONTH_FMT)) + .replace("{{DAY}}", today.format(DAY_FMT)) + + if (conf.get(KUBERNETES_SPARK_AUTO_CREATE_FILE_UPLOAD_PATH)) { + val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf, loadDefaults = false) + val path = new Path(uploadPath) + var fs: FileSystem = null + try { + fs = path.getFileSystem(hadoopConf) + if (!fs.exists(path)) { + info(s"Try creating $KUBERNETES_FILE_UPLOAD_PATH: $uploadPath") + fs.mkdirs(path, KUBERNETES_UPLOAD_PATH_PERMISSION) + } + } catch { + case ioe: IOException => + warn(s"Failed to create $KUBERNETES_FILE_UPLOAD_PATH: $uploadPath", ioe) + } finally { + if (fs != null) { + Utils.tryLogNonFatalError(fs.close()) + } + } + } + Map(KUBERNETES_FILE_UPLOAD_PATH -> uploadPath) + case None => + Map.empty + } + } + def extraYarnConf(conf: Map[String, String]): Map[String, String] = { val map = mutable.Map.newBuilder[String, String] if (clusterManager().exists(_.toLowerCase(Locale.ROOT).startsWith("yarn"))) { @@ -294,6 +337,14 @@ class SparkProcessBuilder( } } + def isK8sClusterMode: Boolean = { + clusterManager().map(_.toLowerCase(Locale.ROOT)) match { + case Some(m) if m.startsWith("k8s") => + deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster") + case _ => false + } + } + def kubernetesContext(): Option[String] = { conf.getOption(KUBERNETES_CONTEXT_KEY).orElse(defaultsConf.get(KUBERNETES_CONTEXT_KEY)) } @@ -302,6 +353,11 @@ class SparkProcessBuilder( conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY)) } + def kubernetesFileUploadPath(): Option[String] = { + conf.getOption(KUBERNETES_FILE_UPLOAD_PATH) + .orElse(defaultsConf.get(KUBERNETES_FILE_UPLOAD_PATH)) + } + override def validateConf(): Unit = Validator.validateConf(conf) // For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user @@ -331,6 +387,13 @@ object SparkProcessBuilder { final val YARN_MAX_APP_ATTEMPTS_KEY = "spark.yarn.maxAppAttempts" final val INTERNAL_RESOURCE = "spark-internal" + final val KUBERNETES_FILE_UPLOAD_PATH = "spark.kubernetes.file.upload.path" + final val KUBERNETES_UPLOAD_PATH_PERMISSION = new FsPermission(Integer.parseInt("777", 8).toShort) + + final val YEAR_FMT = DateTimeFormatter.ofPattern("yyyy") + final val MONTH_FMT = DateTimeFormatter.ofPattern("MM") + final val DAY_FMT = DateTimeFormatter.ofPattern("dd") + /** * The path configs from Spark project that might upload local files: * - SparkSubmit diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala index e3603e24ec9..f858a0f7784 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala @@ -17,6 +17,8 @@ package org.apache.kyuubi.engine.spark +import java.time.LocalDate +import java.time.format.DateTimeFormatter import java.util.UUID import org.apache.kyuubi.KyuubiFunSuite @@ -36,4 +38,24 @@ class SparkBatchProcessBuilderSuite extends KyuubiFunSuite { None) assert(builder.commands.toSeq.contains("spark.kyuubi.key=value")) } + + test("spark.kubernetes.file.upload.path supports placeholder") { + val conf1 = KyuubiConf(false) + conf1.set("spark.master", "k8s://test:12345") + conf1.set("spark.submit.deployMode", "cluster") + conf1.set("spark.kubernetes.file.upload.path", "hdfs:///spark-upload-{{YEAR}}{{MONTH}}{{DAY}}") + val builder1 = new SparkBatchProcessBuilder( + "", + conf1, + UUID.randomUUID().toString, + "test", + Some("test"), + "test", + Map("kyuubi.key" -> "value"), + Seq.empty, + None) + val commands1 = builder1.toString.split(' ') + val toady = DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now()) + assert(commands1.contains(s"spark.kubernetes.file.upload.path=hdfs:///spark-upload-$toady")) + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index 5f3bae12497..49e4a91568a 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -19,7 +19,8 @@ package org.apache.kyuubi.engine.spark import java.io.File import java.nio.file.{Files, Path, Paths, StandardOpenOption} -import java.time.Duration +import java.time.{Duration, LocalDate} +import java.time.format.DateTimeFormatter import java.util.UUID import java.util.concurrent.{Executors, TimeUnit} @@ -468,6 +469,17 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { None) assert(builder.commands.toSeq.contains("spark.kyuubi.key=value")) } + + test("spark.kubernetes.file.upload.path supports placeholder") { + val conf1 = KyuubiConf(false) + conf1.set("spark.master", "k8s://test:12345") + conf1.set("spark.submit.deployMode", "cluster") + conf1.set("spark.kubernetes.file.upload.path", "hdfs:///spark-upload-{{YEAR}}{{MONTH}}{{DAY}}") + val builder1 = new SparkProcessBuilder("", true, conf1) + val commands1 = builder1.toString.split(' ') + val toady = DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now()) + assert(commands1.contains(s"spark.kubernetes.file.upload.path=hdfs:///spark-upload-$toady")) + } } class FakeSparkProcessBuilder(config: KyuubiConf) From 70698977826caac7ce40ea7527eb3c90eb03a63a Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 30 Dec 2024 14:31:20 +0800 Subject: [PATCH 2/6] docs --- docs/configuration/settings.md | 1 + .../org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index bb81d8de534..3c17e25b81a 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -362,6 +362,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.kubernetes.namespace | default | The namespace that will be used for running the kyuubi pods and find engines. | string | 1.7.0 | | kyuubi.kubernetes.namespace.allow.list || The allowed kubernetes namespace list, if it is empty, there is no kubernetes namespace limitation. | set | 1.8.0 | | kyuubi.kubernetes.spark.appUrlPattern | http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}} | The pattern to generate the spark on kubernetes application UI URL. The pattern should contain placeholders for the application variables. Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, `{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`. | string | 1.10.0 | +| kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled | false | If enabled, Kyuubi server will try to create the `spark.kubernetes.file.upload.path` with permission 777 before submitting the Spark application. | boolean | 1.11.0 | | kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval | PT1M | Kyuubi server use guava cache as the cleanup trigger with time-based eviction, but the eviction would not happened until any get/put operation happened. This option schedule a daemon thread evict cache periodically. | duration | 1.8.1 | | kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind | NONE | Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are NONE, ALL, COMPLETED and default value is None which means none of the pod will be deleted | string | 1.8.1 | | kyuubi.kubernetes.spark.forciblyRewriteDriverPodName.enabled | false | Whether to forcibly rewrite Spark driver pod name with 'kyuubi--driver'. If disabled, Kyuubi will try to preserve the application name while satisfying K8s' pod name policy, but some vendors may have stricter pod name policies, thus the generated name may become illegal. | boolean | 1.8.1 | diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index b0bf9fd4e36..936e690d5e4 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -27,7 +27,6 @@ import scala.collection.mutable import com.google.common.annotations.VisibleForTesting import org.apache.commons.lang3.StringUtils -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.security.UserGroupInformation From 3eade8bc4bca981c54465ada19e8d3a5144f6d4c Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 30 Dec 2024 14:57:56 +0800 Subject: [PATCH 3/6] fix --- .../org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index 936e690d5e4..010fe954fbf 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -303,7 +303,7 @@ class SparkProcessBuilder( } } Map(KUBERNETES_FILE_UPLOAD_PATH -> uploadPath) - case None => + case _ => Map.empty } } From 343adaefb51b917cb4b3b764904a6bb0304e1e07 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Tue, 31 Dec 2024 10:25:03 +0800 Subject: [PATCH 4/6] review --- .../apache/kyuubi/engine/spark/SparkProcessBuilder.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index 010fe954fbf..b9b029c249b 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -337,11 +337,8 @@ class SparkProcessBuilder( } def isK8sClusterMode: Boolean = { - clusterManager().map(_.toLowerCase(Locale.ROOT)) match { - case Some(m) if m.startsWith("k8s") => - deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster") - case _ => false - } + clusterManager().exists(cm => cm.toLowerCase(Locale.ROOT).startsWith("k8s")) && + deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster") } def kubernetesContext(): Option[String] = { From 5d5cb3eb318cbe3eb58d5313324cfee2845e83c3 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Tue, 31 Dec 2024 10:37:55 +0800 Subject: [PATCH 5/6] docs --- docs/deployment/engine_on_kubernetes.md | 21 +++++++++++++++++++ .../engine/spark/SparkProcessBuilder.scala | 2 +- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/docs/deployment/engine_on_kubernetes.md b/docs/deployment/engine_on_kubernetes.md index 2d46cda7cb0..c7cbb25fd51 100644 --- a/docs/deployment/engine_on_kubernetes.md +++ b/docs/deployment/engine_on_kubernetes.md @@ -54,6 +54,27 @@ directory from growing indefinitely. Since Kyuubi v1.11.0, you can configure `sp placeholders `{{YEAR}}`, `{{MONTH}}` and `{{DAY}}`, and enable `kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled` to let Kyuubi server create the directory with 777 permission automatically before submitting Spark application. +Note that, Spark would create sub dir `s"spark-upload-${UUID.randomUUID()}"` under the `spark.kubernetes.file.upload.path` +for each uploading, the administer still needs to clean up the staging directory periodically. + +For example, the user can configure the below configurations in `kyuubi-defaults.conf` to enable monthly rolling support +for `spark.kubernetes.file.upload.path` + +``` +kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled=true +spark.kubernetes.file.upload.path=hdfs://hadoop-cluster/spark-upload-{{YEAR}}{{MONTH}} +``` + +and the staging files would be like + +``` +hdfs://hadoop-cluster/spark-upload-202412/spark-upload-f2b71340-dc1d-4940-89e2-c5fc31614eb4 +hdfs://hadoop-cluster/spark-upload-202412/spark-upload-173a8653-4d3e-48c0-b8ab-b7f92ae582d6 +hdfs://hadoop-cluster/spark-upload-202501/spark-upload-3b22710f-a4a0-40bb-a3a8-16e481038a63 +``` + +then the administer can safely delete the `hdfs://hadoop-cluster/spark-upload-202412` after 20250101. + ### Docker Image Spark ships a `./bin/docker-image-tool.sh` script to build and publish the Docker images for running Spark applications on Kubernetes. diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index b9b029c249b..552c495523a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -338,7 +338,7 @@ class SparkProcessBuilder( def isK8sClusterMode: Boolean = { clusterManager().exists(cm => cm.toLowerCase(Locale.ROOT).startsWith("k8s")) && - deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster") + deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster") } def kubernetesContext(): Option[String] = { From 6614bf29c653a2617727db9ef5c1482ff8580b5d Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Tue, 31 Dec 2024 10:45:56 +0800 Subject: [PATCH 6/6] comment --- .../org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index 552c495523a..7c862f52180 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -284,6 +284,9 @@ class SparkProcessBuilder( .replace("{{DAY}}", today.format(DAY_FMT)) if (conf.get(KUBERNETES_SPARK_AUTO_CREATE_FILE_UPLOAD_PATH)) { + // Create the `uploadPath` using permission 777, otherwise, spark just creates the + // `$uploadPath/spark-upload-$uuid` using default permission 511, which might prevent + // other users from creating the staging dir under `uploadPath` later. val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf, loadDefaults = false) val path = new Path(uploadPath) var fs: FileSystem = null