Skip to content

Commit

Permalink
Spark on Kubernetes - basic submission client
Browse files Browse the repository at this point in the history
  • Loading branch information
liyinan926 committed Nov 22, 2017
1 parent 7475a96 commit 60234a2
Show file tree
Hide file tree
Showing 33 changed files with 2,623 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ notifications:
# 5. Run maven install before running lint-java.
install:
- export MAVEN_SKIP_RC=1
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
- build/mvn -T 4 -q -DskipTests -Pkubernetes -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install

# 6. Run lint-java.
script:
Expand Down
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>kubernetes</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
Expand Down
37 changes: 31 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
private val KUBERNETES = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES

// Deploy modes
private val CLIENT = 1
Expand Down Expand Up @@ -251,6 +252,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
Expand Down Expand Up @@ -296,6 +298,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isPython =>
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
printErrorAndExit("R applications are currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
Expand All @@ -316,6 +324,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER

if (!isMesosCluster && !isStandAloneCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
Expand Down Expand Up @@ -555,20 +564,24 @@ object SparkSubmit extends CommandLineUtils with Logging {
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),

// Kubernetes only
OptionAssigner(args.kubernetesNamespace, KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.kubernetes.namespace"),

// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
confKey = "spark.files"),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, confKey = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
confKey = "spark.driver.supervise"),
Expand Down Expand Up @@ -702,6 +715,18 @@ object SparkSubmit extends CommandLineUtils with Logging {
}
}

if (isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += "--arg"
childArgs += arg
}
}
}

// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var submissionToRequestStatusFor: String = null
var useRest: Boolean = true // used internally

// Kubernetes only
var kubernetesNamespace: String = null

/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
Expand Down Expand Up @@ -199,6 +202,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull

kubernetesNamespace = Option(kubernetesNamespace)
.orElse(sparkProperties.get("spark.kubernetes.namespace"))
.orNull

// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && !isR && primaryResource != null) {
val uri = new URI(primaryResource)
Expand Down Expand Up @@ -454,6 +461,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case KEYTAB =>
keytab = value

case KUBERNETES_NAMESPACE =>
kubernetesNamespace = value

case HELP =>
printUsageAndExit(0)

Expand Down Expand Up @@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| the node running the Application Master via the Secure
| Distributed Cache, for renewing the login tickets and the
| delegation tokens periodically.
|
| Kubernetes only:
| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the
| application must be launched. The namespace must already
| exist in the cluster. (Default: default).
""".stripMargin
)

Expand Down
26 changes: 26 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,32 @@ class SparkSubmitSuite
conf.get("spark.ui.enabled") should be ("false")
}

test("handles k8s cluster mode") {
val clArgs = Seq(
"--deploy-mode", "cluster",
"--master", "k8s://host:port",
"--executor-memory", "5g",
"--class", "org.SomeClass",
"--kubernetes-namespace", "foo",
"--driver-memory", "4g",
"--conf", "spark.kubernetes.driver.docker.image=bar",
"/home/thejar.jar",
"arg1")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs)

val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap
childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar"))
childArgsMap.get("--main-class") should be (Some("org.SomeClass"))
childArgsMap.get("--arg") should be (Some("arg1"))
mainClass should be ("org.apache.spark.deploy.k8s.submit.Client")
classpath should have length (0)
conf.get("spark.executor.memory") should be ("5g")
conf.get("spark.driver.memory") should be ("4g")
conf.get("spark.kubernetes.namespace") should be ("foo")
conf.get("spark.kubernetes.driver.docker.image") should be ("bar")
}

test("handles confs with flag equivalents") {
val clArgs = Seq(
"--deploy-mode", "cluster",
Expand Down
8 changes: 8 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,14 @@ def __hash__(self):
sbt_test_goals=["mesos/test"]
)

kubernetes = Module(
name="kubernetes",
dependencies=[],
source_file_regexes=["resource-managers/kubernetes/core"],
build_profile_flags=["-Pkubernetes"],
sbt_test_goals=["kubernetes/test"]
)

# The root module is a dummy module which is used to run all of the tests.
# No other modules should directly depend on this module.
root = Module(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,16 @@ class SparkSubmitOptionParser {
protected final String PRINCIPAL = "--principal";
protected final String QUEUE = "--queue";

// Kubernetes-only options.
protected final String KUBERNETES_NAMESPACE = "--kubernetes-namespace";

/**
* This is the canonical list of spark-submit options. Each entry in the array contains the
* different aliases for the same option; the first element of each entry is the "official"
* name of the option, passed to {@link #handle(String, String)}.
* <p>
* Options not listed here nor in the "switch" list below will result in a call to
* {@link $#handleUnknown(String)}.
* {@link #handleUnknown(String)}.
* <p>
* These two arrays are visible for tests.
*/
Expand Down Expand Up @@ -115,6 +118,7 @@ class SparkSubmitOptionParser {
{ REPOSITORIES },
{ STATUS },
{ TOTAL_EXECUTOR_CORES },
{ KUBERNETES_NAMESPACE },
};

/**
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2662,6 +2662,13 @@
</modules>
</profile>

<profile>
<id>kubernetes</id>
<modules>
<module>resource-managers/kubernetes/core</module>
</modules>
</profile>

<profile>
<id>hive-thriftserver</id>
<modules>
Expand Down
8 changes: 4 additions & 4 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ object BuildCommons {
"tags", "sketch", "kvstore"
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects

val optionallyEnabledProjects@Seq(mesos, yarn,
val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn,
streamingFlumeSink, streamingFlume,
streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
dockerIntegrationTests, hadoopCloud) =
Seq("mesos", "yarn",
Seq("kubernetes", "mesos", "yarn",
"streaming-flume-sink", "streaming-flume",
"streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
"docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _))
Expand Down Expand Up @@ -671,9 +671,9 @@ object Unidoc {
publish := {},

unidocProjectFilter in(ScalaUnidoc, unidoc) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
unidocProjectFilter in(JavaUnidoc, unidoc) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),

unidocAllClasspaths in (ScalaUnidoc, unidoc) := {
ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value)
Expand Down
102 changes: 102 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.3.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<artifactId>spark-kubernetes_2.11</artifactId>
<packaging>jar</packaging>
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<kubernetes.client.version>3.0.0</kubernetes.client.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>${kubernetes.client.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Required by kubernetes-client but we exclude it -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>

<!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- End of shaded deps. -->

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

</dependencies>


<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>

</project>
Loading

0 comments on commit 60234a2

Please sign in to comment.