-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22646] [Submission] Spark on Kubernetes - basic submission client #19717
Changes from 8 commits
dcaac45
27c67ff
6d597d0
5b9fa39
5ccadb5
12f2797
c35fe48
faa2849
347ed69
0e8ca01
3a0b8e3
83d0b9c
44c40b1
67bc847
7d2b303
caf2206
2e7810b
cbcd30e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,7 +76,8 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
private val STANDALONE = 2 | ||
private val MESOS = 4 | ||
private val LOCAL = 8 | ||
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | ||
private val KUBERNETES = 16 | ||
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES | ||
|
||
// Deploy modes | ||
private val CLIENT = 1 | ||
|
@@ -97,6 +98,7 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
"org.apache.spark.deploy.yarn.YarnClusterApplication" | ||
private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName() | ||
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName() | ||
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.k8s.submit.Client" | ||
|
||
// scalastyle:off println | ||
private[spark] def printVersionAndExit(): Unit = { | ||
|
@@ -257,9 +259,10 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
YARN | ||
case m if m.startsWith("spark") => STANDALONE | ||
case m if m.startsWith("mesos") => MESOS | ||
case m if m.startsWith("k8s") => KUBERNETES | ||
case m if m.startsWith("local") => LOCAL | ||
case _ => | ||
printErrorAndExit("Master must either be yarn or start with spark, mesos, local") | ||
printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, or local") | ||
-1 | ||
} | ||
|
||
|
@@ -294,6 +297,16 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
} | ||
} | ||
|
||
if (clusterManager == KUBERNETES) { | ||
args.master = Utils.checkAndGetK8sMasterUrl(args.master) | ||
// Make sure YARN is included in our build if we're trying to use it | ||
if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) { | ||
printErrorAndExit( | ||
"Could not load KUBERNETES classes. " + | ||
"This copy of Spark may not have been compiled with KUBERNETES support.") | ||
} | ||
} | ||
|
||
// Fail fast, the following modes are not supported or applicable | ||
(clusterManager, deployMode) match { | ||
case (STANDALONE, CLUSTER) if args.isPython => | ||
|
@@ -302,6 +315,12 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
case (STANDALONE, CLUSTER) if args.isR => | ||
printErrorAndExit("Cluster deploy mode is currently not supported for R " + | ||
"applications on standalone clusters.") | ||
case (KUBERNETES, _) if args.isPython => | ||
printErrorAndExit("Python applications are currently not supported for Kubernetes.") | ||
case (KUBERNETES, _) if args.isR => | ||
printErrorAndExit("R applications are currently not supported for Kubernetes.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Not affect the result, but logically I think it is better: case (KUBERNETES, _) if args.isPython =>
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
printErrorAndExit("R applications are currently not supported for Kubernetes.")
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.") There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
case (KUBERNETES, CLIENT) => | ||
printErrorAndExit("Client mode is currently not supported for Kubernetes.") | ||
case (LOCAL, CLUSTER) => | ||
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"") | ||
case (_, CLUSTER) if isShell(args.primaryResource) => | ||
|
@@ -322,6 +341,7 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER | ||
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER | ||
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER | ||
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER | ||
|
||
if (!isMesosCluster && !isStandAloneCluster) { | ||
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files | ||
|
@@ -556,20 +576,24 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"), | ||
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"), | ||
|
||
// Kubernetes only | ||
OptionAssigner(args.kubernetesNamespace, KUBERNETES, ALL_DEPLOY_MODES, | ||
confKey = "spark.kubernetes.namespace"), | ||
|
||
// Other options | ||
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, | ||
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, | ||
confKey = "spark.executor.cores"), | ||
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, | ||
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES, | ||
confKey = "spark.executor.memory"), | ||
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, | ||
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, | ||
confKey = "spark.cores.max"), | ||
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, | ||
confKey = "spark.files"), | ||
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"), | ||
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, confKey = "spark.jars"), | ||
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER, | ||
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, | ||
confKey = "spark.driver.memory"), | ||
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER, | ||
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, | ||
confKey = "spark.driver.cores"), | ||
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, | ||
confKey = "spark.driver.supervise"), | ||
|
@@ -703,6 +727,19 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
} | ||
} | ||
|
||
if (isKubernetesCluster) { | ||
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS | ||
if (args.primaryResource != SparkLauncher.NO_RESOURCE) { | ||
childArgs ++= Array("--primary-java-resource", args.primaryResource) | ||
} | ||
childArgs ++= Array("--main-class", args.mainClass) | ||
if (args.childArgs != null) { | ||
args.childArgs.foreach { arg => | ||
childArgs += ("--arg", arg) | ||
} | ||
} | ||
} | ||
|
||
// Load any properties specified through --conf and the default properties file | ||
for ((k, v) <- args.sparkProperties) { | ||
sparkConf.setIfMissing(k, v) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -81,6 +81,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | |
var submissionToRequestStatusFor: String = null | ||
var useRest: Boolean = true // used internally | ||
|
||
// Kubernetes only | ||
var kubernetesNamespace: String = null | ||
|
||
/** Default properties present in the currently defined defaults file. */ | ||
lazy val defaultSparkProperties: HashMap[String, String] = { | ||
val defaultProperties = new HashMap[String, String]() | ||
|
@@ -199,6 +202,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | |
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull | ||
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull | ||
|
||
kubernetesNamespace = Option(kubernetesNamespace) | ||
.orElse(sparkProperties.get("spark.kubernetes.namespace")) | ||
.orNull | ||
|
||
// Try to set main class from JAR if no --class argument is given | ||
if (mainClass == null && !isPython && !isR && primaryResource != null) { | ||
val uri = new URI(primaryResource) | ||
|
@@ -454,6 +461,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | |
case KEYTAB => | ||
keytab = value | ||
|
||
case KUBERNETES_NAMESPACE => | ||
kubernetesNamespace = value | ||
|
||
case HELP => | ||
printUsageAndExit(0) | ||
|
||
|
@@ -515,8 +525,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | |
outStream.println( | ||
s""" | ||
|Options: | ||
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local | ||
| (Default: local[*]). | ||
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, | ||
| k8s://https://host:port, or local (Default: local[*]). | ||
| --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or | ||
| on one of the worker machines inside the cluster ("cluster") | ||
| (Default: client). | ||
|
@@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | |
| the node running the Application Master via the Secure | ||
| Distributed Cache, for renewing the login tickets and the | ||
| delegation tokens periodically. | ||
| | ||
| Kubernetes only: | ||
| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it a big deal to not add this as a command line arg and force people to use the configuration instead? I'd prefer to not add even more cluster-specific switches to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was more for convenience to allow command line parameters for setting common Kubernetes options. We added these to be similar in spirit to the other cluster managers. We can revert these. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reverted. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1, this can be removed. |
||
| application must be launched. The namespace must already | ||
| exist in the cluster. (Default: default). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should also add check for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now that docker image is without a default value ( tbh I'm generally against adding There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if we eventually decide to not have default docker images, we should make the options There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are some messages needed to be updated too, e.g,:
From above, k8s supports killing submission and requesting submission statuses.
k8s also supports There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually k8s does not yet support |
||
""".stripMargin | ||
) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2744,6 +2744,25 @@ private[spark] object Utils extends Logging { | |
} | ||
} | ||
|
||
/** | ||
* Check the validity of the given Kubernetes master URL and return the resolved URL. | ||
*/ | ||
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = { | ||
require(rawMasterURL.startsWith("k8s://"), | ||
"Kubernetes master URL must start with k8s://.") | ||
val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we change this String representation to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the changes, URLs like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All of the following should be supported and resolved as follows:
Think we just need to use whatever code that is necessary to get to this state. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are those URIs set in stone? This seems more readable to me:
It also allows parsing using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This decision is to match how Mesos works with Zookeeper, as these Strings are valid IIRC: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, did some changes to make all supported formats work. See 51844cc. |
||
if (masterWithoutK8sPrefix.startsWith("https://")) { | ||
masterWithoutK8sPrefix | ||
} else if (masterWithoutK8sPrefix.startsWith("http://")) { | ||
logWarning("Kubernetes master URL uses HTTP instead of HTTPS.") | ||
masterWithoutK8sPrefix | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be happened only when scheme is not existed? From the code looks like if user misconfigured with different scheme, the code will also be executed here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, the code only applies if no scheme exists. Fixed. |
||
val resolvedURL = s"https://$masterWithoutK8sPrefix" | ||
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " + | ||
s"URL is $resolvedURL.") | ||
resolvedURL | ||
} | ||
} | ||
} | ||
|
||
private[util] object CallerContext extends Logging { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -388,6 +388,32 @@ class SparkSubmitSuite | |
conf.get("spark.ui.enabled") should be ("false") | ||
} | ||
|
||
test("handles k8s cluster mode") { | ||
val clArgs = Seq( | ||
"--deploy-mode", "cluster", | ||
"--master", "k8s://host:port", | ||
"--executor-memory", "5g", | ||
"--class", "org.SomeClass", | ||
"--kubernetes-namespace", "foo", | ||
"--driver-memory", "4g", | ||
"--conf", "spark.kubernetes.driver.docker.image=bar", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also test the arg "--kubernetes-namespace"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
"/home/thejar.jar", | ||
"arg1") | ||
val appArgs = new SparkSubmitArguments(clArgs) | ||
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs) | ||
|
||
val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap | ||
childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar")) | ||
childArgsMap.get("--main-class") should be (Some("org.SomeClass")) | ||
childArgsMap.get("--arg") should be (Some("arg1")) | ||
mainClass should be (KUBERNETES_CLUSTER_SUBMIT_CLASS) | ||
classpath should have length (0) | ||
conf.get("spark.executor.memory") should be ("5g") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
conf.get("spark.driver.memory") should be ("4g") | ||
conf.get("spark.kubernetes.namespace") should be ("foo") | ||
conf.get("spark.kubernetes.driver.docker.image") should be ("bar") | ||
} | ||
|
||
test("handles confs with flag equivalents") { | ||
val clArgs = Seq( | ||
"--deploy-mode", "cluster", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should say KUBERNETES
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.