Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22646] [Submission] Spark on Kubernetes - basic submission client #19717

Closed
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>kubernetes</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
Expand Down
51 changes: 44 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
private val KUBERNETES = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES

// Deploy modes
private val CLIENT = 1
Expand All @@ -97,6 +98,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
"org.apache.spark.deploy.yarn.YarnClusterApplication"
private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.k8s.submit.Client"

// scalastyle:off println
private[spark] def printVersionAndExit(): Unit = {
Expand Down Expand Up @@ -257,9 +259,10 @@ object SparkSubmit extends CommandLineUtils with Logging {
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}

Expand Down Expand Up @@ -294,6 +297,16 @@ object SparkSubmit extends CommandLineUtils with Logging {
}
}

if (clusterManager == KUBERNETES) {
args.master = Utils.checkAndGetK8sMasterUrl(args.master)
// Make sure YARN is included in our build if we're trying to use it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should say KUBERNETES

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
printErrorAndExit(
"Could not load KUBERNETES classes. " +
"This copy of Spark may not have been compiled with KUBERNETES support.")
}
}

// Fail fast, the following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
Expand All @@ -302,6 +315,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (KUBERNETES, _) if args.isPython =>
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
printErrorAndExit("R applications are currently not supported for Kubernetes.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not affect the result, but logically I think it is better:

case (KUBERNETES, _) if args.isPython =>
  printErrorAndExit("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
  printErrorAndExit("R applications are currently not supported for Kubernetes.")
case (KUBERNETES, CLIENT) =>
  printErrorAndExit("Client mode is currently not supported for Kubernetes.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
Expand All @@ -322,6 +341,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER

if (!isMesosCluster && !isStandAloneCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
Expand Down Expand Up @@ -556,20 +576,24 @@ object SparkSubmit extends CommandLineUtils with Logging {
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),

// Kubernetes only
OptionAssigner(args.kubernetesNamespace, KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.kubernetes.namespace"),

// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
confKey = "spark.files"),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, confKey = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
confKey = "spark.driver.supervise"),
Expand Down Expand Up @@ -703,6 +727,19 @@ object SparkSubmit extends CommandLineUtils with Logging {
}
}

if (isKubernetesCluster) {
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
}
childArgs ++= Array("--main-class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += ("--arg", arg)
}
}
}

// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var submissionToRequestStatusFor: String = null
var useRest: Boolean = true // used internally

// Kubernetes only
var kubernetesNamespace: String = null

/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
Expand Down Expand Up @@ -199,6 +202,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull

kubernetesNamespace = Option(kubernetesNamespace)
.orElse(sparkProperties.get("spark.kubernetes.namespace"))
.orNull

// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && !isR && primaryResource != null) {
val uri = new URI(primaryResource)
Expand Down Expand Up @@ -454,6 +461,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case KEYTAB =>
keytab = value

case KUBERNETES_NAMESPACE =>
kubernetesNamespace = value

case HELP =>
printUsageAndExit(0)

Expand Down Expand Up @@ -515,8 +525,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
outStream.println(
s"""
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local
| (Default: local[*]).
| --master MASTER_URL spark://host:port, mesos://host:port, yarn,
| k8s://https://host:port, or local (Default: local[*]).
| --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
| on one of the worker machines inside the cluster ("cluster")
| (Default: client).
Expand Down Expand Up @@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| the node running the Application Master via the Secure
| Distributed Cache, for renewing the login tickets and the
| delegation tokens periodically.
|
| Kubernetes only:
| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a big deal to not add this as a command line arg and force people to use the configuration instead? I'd prefer to not add even more cluster-specific switches to SparkSubmit, at least not until it is refactored to be pluggable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's absolutely necessary to have a parameter for the namespace. --kubernetes-namespace is not significantly shorter than spark.kubernetes.namespace. @mccheah @foxish WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was more for convenience to allow command line parameters for setting common Kubernetes options. We added these to be similar in spirit to the other cluster managers. We can revert these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this can be removed.

| application must be launched. The namespace must already
| exist in the cluster. (Default: default).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add check for validateKillArguments and validateStatusRequstArguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that docker image is without a default value (spark.kubernetes.*.docker.image), (yes that discussion is ongoing) I wonder if it makes sense to bubble that up as a --param for visibility/convenient.

tbh I'm generally against adding --param to submit because of the potential confusion it can cause, but since we are here and there's a --kubernetes-namespace

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we eventually decide to not have default docker images, we should make the options --param ones. I'm not sure if we want to make a call and do that in this PR though. Can we defer this to a later time when we are clearer on how we publish and maintain the images?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some messages needed to be updated too, e.g,:

    | Spark standalone or Mesos with cluster deploy mode only:
    |  --supervise                 If given, restarts the driver on failure.
    |  --kill SUBMISSION_ID        If given, kills the driver specified.
    |  --status SUBMISSION_ID      If given, requests the status of the driver specified.

From above, k8s supports killing submission and requesting submission statuses.

    | Spark standalone and Mesos only:
    |  --total-executor-cores NUM  Total cores for all executors.

k8s also supports totalExecutorCores option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually k8s does not yet support kill and status, nor does it support spark.cores.max yet. Updated validateKillArguments and validateStatusRequestArguments.

""".stripMargin
)

Expand Down
19 changes: 19 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2744,6 +2744,25 @@ private[spark] object Utils extends Logging {
}
}

/**
* Check the validity of the given Kubernetes master URL and return the resolved URL.
*/
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
require(rawMasterURL.startsWith("k8s://"),
"Kubernetes master URL must start with k8s://.")
val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this String representation to URI to make the below check more robust.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor Author

@liyinan926 liyinan926 Dec 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the changes, URLs like k8s://host:port is no longer valid as the part host:port is parsed as a URI with a host scheme. Instead, k8s:///host:port should be used. I'm not super familiar with URIs, so I'm not sure if this is desirable, or should we stop automatically appending https:// completely. /cc @mccheah @foxish.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the following should be supported and resolved as follows:

  • k8s://host:port -> https://host:port
  • k8s://https://host:port -> https://host:port
  • k8s://http://host:port -> http://host:port

Think we just need to use whatever code that is necessary to get to this state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are those URIs set in stone? This seems more readable to me:

scala> new URI("k8s:http://foo:1234")
res0: java.net.URI = k8s:http://foo:1234

scala> res0.getScheme
res1: String = k8s

scala> res0.getSchemeSpecificPart
res2: String = http://foo:1234

It also allows parsing using the URI APIs instead of doing string manipulation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This decision is to match how Mesos works with Zookeeper, as these Strings are valid IIRC: mesos://zk://host1:2181,host2:2181,host3:2181/mesos. That's just the precedent we are using, but we can use a different convention.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, did some changes to make all supported formats work. See 51844cc.

if (masterWithoutK8sPrefix.startsWith("https://")) {
masterWithoutK8sPrefix
} else if (masterWithoutK8sPrefix.startsWith("http://")) {
logWarning("Kubernetes master URL uses HTTP instead of HTTPS.")
masterWithoutK8sPrefix
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be happened only when scheme is not existed? From the code looks like if user misconfigured with different scheme, the code will also be executed here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the code only applies if no scheme exists. Fixed.

val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL.")
resolvedURL
}
}
}

private[util] object CallerContext extends Logging {
Expand Down
26 changes: 26 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,32 @@ class SparkSubmitSuite
conf.get("spark.ui.enabled") should be ("false")
}

test("handles k8s cluster mode") {
val clArgs = Seq(
"--deploy-mode", "cluster",
"--master", "k8s://host:port",
"--executor-memory", "5g",
"--class", "org.SomeClass",
"--kubernetes-namespace", "foo",
"--driver-memory", "4g",
"--conf", "spark.kubernetes.driver.docker.image=bar",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test the arg "--kubernetes-namespace"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--kubernetes-namespace has been removed in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"/home/thejar.jar",
"arg1")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs)

val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap
childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar"))
childArgsMap.get("--main-class") should be (Some("org.SomeClass"))
childArgsMap.get("--arg") should be (Some("arg1"))
mainClass should be (KUBERNETES_CLUSTER_SUBMIT_CLASS)
classpath should have length (0)
conf.get("spark.executor.memory") should be ("5g")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check spark.master too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

conf.get("spark.driver.memory") should be ("4g")
conf.get("spark.kubernetes.namespace") should be ("foo")
conf.get("spark.kubernetes.driver.docker.image") should be ("bar")
}

test("handles confs with flag equivalents") {
val clArgs = Seq(
"--deploy-mode", "cluster",
Expand Down
11 changes: 11 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,17 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
}
}

test("check Kubernetes master URL") {
val k8sMasterURLHttps = Utils.checkAndGetK8sMasterUrl("k8s://https://host:port")
assert(k8sMasterURLHttps == "https://host:port")

val k8sMasterURLHttp = Utils.checkAndGetK8sMasterUrl("k8s://http://host:port")
assert(k8sMasterURLHttp == "http://host:port")

intercept[IllegalArgumentException] {
Utils.checkAndGetK8sMasterUrl("k8s:https://host:port")
}
}
}

private class SimpleExtension
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,16 @@ class SparkSubmitOptionParser {
protected final String PRINCIPAL = "--principal";
protected final String QUEUE = "--queue";

// Kubernetes-only options.
protected final String KUBERNETES_NAMESPACE = "--kubernetes-namespace";

/**
* This is the canonical list of spark-submit options. Each entry in the array contains the
* different aliases for the same option; the first element of each entry is the "official"
* name of the option, passed to {@link #handle(String, String)}.
* <p>
* Options not listed here nor in the "switch" list below will result in a call to
* {@link $#handleUnknown(String)}.
* {@link #handleUnknown(String)}.
* <p>
* These two arrays are visible for tests.
*/
Expand Down Expand Up @@ -115,6 +118,7 @@ class SparkSubmitOptionParser {
{ REPOSITORIES },
{ STATUS },
{ TOTAL_EXECUTOR_CORES },
{ KUBERNETES_NAMESPACE },
};

/**
Expand Down
Loading