Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Commit

Permalink
Allow user-specified environment variables and secrets in the init-co…
Browse files Browse the repository at this point in the history
…ntainer (#564)

* Allow setting user-specified environments in the init-container

* Use driver/executor env keys for the init-container

* Mount user-specified driver/executor secrets

* Addressed comments
  • Loading branch information
liyinan926 authored and mccheah committed Dec 4, 2017
1 parent 15a333c commit 0612195
Show file tree
Hide file tree
Showing 9 changed files with 349 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder}
import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, EnvVarBuilder, PodBuilder, VolumeMount, VolumeMountBuilder}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._

/**
Expand Down Expand Up @@ -47,7 +51,9 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
filesDownloadPath: String,
downloadTimeoutMinutes: Long,
initContainerConfigMapName: String,
initContainerConfigMapKey: String)
initContainerConfigMapKey: String,
sparkRole: String,
sparkConf: SparkConf)
extends SparkPodInitContainerBootstrap {

override def bootstrapInitContainerAndVolumes(
Expand All @@ -62,17 +68,32 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
.withMountPath(filesDownloadPath)
.build())

val initContainerCustomEnvVarKeyPrefix = sparkRole match {
case SPARK_POD_DRIVER_ROLE => KUBERNETES_DRIVER_ENV_KEY
case SPARK_POD_EXECUTOR_ROLE => "spark.executorEnv."
case _ => throw new SparkException(s"$sparkRole is not a valid Spark pod role")
}
val initContainerCustomEnvVars = sparkConf.getAllWithPrefix(initContainerCustomEnvVarKeyPrefix)
.toSeq
.map(env =>
new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build())

val initContainer = new ContainerBuilder(podWithDetachedInitContainer.initContainer)
.withName(s"spark-init")
.withImage(initContainerImage)
.withImagePullPolicy(dockerImagePullPolicy)
.addAllToEnv(initContainerCustomEnvVars.asJava)
.addNewVolumeMount()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
.withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR)
.endVolumeMount()
.addToVolumeMounts(sharedVolumeMounts: _*)
.addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
.build()

val podWithBasicVolumes = new PodBuilder(podWithDetachedInitContainer.pod)
.editSpec()
.addNewVolume()
Expand All @@ -95,6 +116,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
.endVolume()
.endSpec()
.build()

val mainContainerWithMountedFiles = new ContainerBuilder(
podWithDetachedInitContainer.mainContainer)
.addToVolumeMounts(sharedVolumeMounts: _*)
Expand All @@ -103,6 +125,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
.withValue(filesDownloadPath)
.endEnv()
.build()

PodWithDetachedInitContainer(
podWithBasicVolumes,
initContainer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, SubmittedDependencyUploaderImpl}
import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSecretsBootstrapImpl, SubmittedDependencyUploaderImpl}
import org.apache.spark.deploy.rest.k8s.{ResourceStagingServerSslOptionsProviderImpl, RetrofitClientFactoryImpl}
import org.apache.spark.util.Utils

Expand All @@ -43,7 +43,7 @@ private[spark] class InitContainerConfigurationStepsOrchestrator(
private val submittedResourcesSecretName = s"$kubernetesResourceNamePrefix-init-secret"
private val resourceStagingServerUri = submissionSparkConf.get(RESOURCE_STAGING_SERVER_URI)
private val resourceStagingServerInternalUri =
submissionSparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_URI)
submissionSparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_URI)
private val initContainerImage = submissionSparkConf.get(INIT_CONTAINER_DOCKER_IMAGE)
private val downloadTimeoutMinutes = submissionSparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
private val maybeResourceStagingServerInternalTrustStore =
Expand Down Expand Up @@ -92,46 +92,62 @@ private[spark] class InitContainerConfigurationStepsOrchestrator(

def getAllConfigurationSteps(): Seq[InitContainerConfigurationStep] = {
val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl(
initContainerImage,
dockerImagePullPolicy,
jarsDownloadPath,
filesDownloadPath,
downloadTimeoutMinutes,
initContainerConfigMapName,
initContainerConfigMapKey)
initContainerImage,
dockerImagePullPolicy,
jarsDownloadPath,
filesDownloadPath,
downloadTimeoutMinutes,
initContainerConfigMapName,
initContainerConfigMapKey,
SPARK_POD_DRIVER_ROLE,
submissionSparkConf)
val baseInitContainerStep = new BaseInitContainerConfigurationStep(
sparkJars,
sparkFiles,
jarsDownloadPath,
filesDownloadPath,
initContainerConfigMapName,
initContainerConfigMapKey,
initContainerBootstrap)
val submittedResourcesInitContainerStep = resourceStagingServerUri.map {
stagingServerUri =>
sparkJars,
sparkFiles,
jarsDownloadPath,
filesDownloadPath,
initContainerConfigMapName,
initContainerConfigMapKey,
initContainerBootstrap)

val submittedResourcesInitContainerStep = resourceStagingServerUri.map { stagingServerUri =>
val mountSecretPlugin = new InitContainerResourceStagingServerSecretPluginImpl(
submittedResourcesSecretName,
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH)
submittedResourcesSecretName,
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH)
val submittedDependencyUploader = new SubmittedDependencyUploaderImpl(
driverLabels,
namespace,
stagingServerUri,
sparkJars,
sparkFiles,
new ResourceStagingServerSslOptionsProviderImpl(submissionSparkConf).getSslOptions,
RetrofitClientFactoryImpl)
driverLabels,
namespace,
stagingServerUri,
sparkJars,
sparkFiles,
new ResourceStagingServerSslOptionsProviderImpl(submissionSparkConf).getSslOptions,
RetrofitClientFactoryImpl)
new SubmittedResourcesInitContainerConfigurationStep(
submittedResourcesSecretName,
resourceStagingServerInternalUri.getOrElse(stagingServerUri),
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH,
resourceStagingServerInternalSslEnabled,
maybeResourceStagingServerInternalTrustStore,
maybeResourceStagingServerInternalClientCert,
maybeResourceStagingServerInternalTrustStorePassword,
maybeResourceStagingServerInternalTrustStoreType,
submittedDependencyUploader,
mountSecretPlugin)
submittedResourcesSecretName,
resourceStagingServerInternalUri.getOrElse(stagingServerUri),
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH,
resourceStagingServerInternalSslEnabled,
maybeResourceStagingServerInternalTrustStore,
maybeResourceStagingServerInternalClientCert,
maybeResourceStagingServerInternalTrustStorePassword,
maybeResourceStagingServerInternalTrustStoreType,
submittedDependencyUploader,
mountSecretPlugin)
}

val driverSecretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs(
submissionSparkConf,
KUBERNETES_DRIVER_SECRETS_PREFIX,
"driver secrets")
val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) {
val mountSecretsBootstrap = new MountSecretsBootstrapImpl(driverSecretNamesToMountPaths)
Some(new InitContainerMountSecretsStep(mountSecretsBootstrap))
} else {
None
}
Seq(baseInitContainerStep) ++ submittedResourcesInitContainerStep.toSeq

Seq(baseInitContainerStep) ++
submittedResourcesInitContainerStep.toSeq ++
mountSecretsStep.toSeq
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer

import org.apache.spark.deploy.k8s.submit.MountSecretsBootstrap

/**
* An init-container configuration step for mounting user-specified secrets onto user-specified
* paths.
*
* @param mountSecretsBootstrap a utility actually handling mounting of the secrets.
*/
private[spark] class InitContainerMountSecretsStep(
mountSecretsBootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {

override def configureInitContainer(initContainerSpec: InitContainerSpec) : InitContainerSpec = {
val (podWithSecretsMounted, initContainerWithSecretsMounted) =
mountSecretsBootstrap.mountSecrets(
initContainerSpec.podToInitialize,
initContainerSpec.initContainer)
initContainerSpec.copy(
podToInitialize = podWithSecretsMounted,
initContainer = initContainerWithSecretsMounted
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package org.apache.spark.scheduler.cluster.k8s

import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder}
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
Expand All @@ -44,6 +44,7 @@ private[spark] class ExecutorPodFactoryImpl(
mountSecretsBootstrap: Option[MountSecretsBootstrap],
mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap],
executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap],
executorInitContainerMountSecretsBootstrap: Option[MountSecretsBootstrap],
executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin],
executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider)
extends ExecutorPodFactory {
Expand Down Expand Up @@ -82,9 +83,6 @@ private[spark] class ExecutorPodFactoryImpl(
private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT)
private val blockmanagerPort = sparkConf
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
private val kubernetesDriverPodName = sparkConf
.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(throw new SparkException("Must specify the driver pod name"))

private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)

Expand Down Expand Up @@ -234,6 +232,7 @@ private[spark] class ExecutorPodFactoryImpl(
bootstrap.mountSmallFilesSecret(
withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer)
}.getOrElse((withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer))

val (executorPodWithInitContainer, initBootstrappedExecutorContainer) =
executorInitContainerBootstrap.map { bootstrap =>
val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes(
Expand All @@ -247,8 +246,13 @@ private[spark] class ExecutorPodFactoryImpl(
podWithDetachedInitContainer.initContainer)
}.getOrElse(podWithDetachedInitContainer.initContainer)

val (mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted) =
executorInitContainerMountSecretsBootstrap.map { bootstrap =>
bootstrap.mountSecrets(podWithDetachedInitContainer.pod, resolvedInitContainer)
}.getOrElse(podWithDetachedInitContainer.pod, resolvedInitContainer)

val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer(
podWithDetachedInitContainer.pod, resolvedInitContainer)
mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted)

val resolvedPodWithMountedSecret = executorMountInitContainerSecretPlugin.map { plugin =>
plugin.addResourceStagingServerSecretVolumeToPod(podWithAttachedInitContainer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.ThreadUtils

private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {

Expand Down Expand Up @@ -78,7 +78,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION),
sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT),
configMap,
configMapKey)
configMapKey,
SPARK_POD_EXECUTOR_ROLE,
sparkConf)
}

val mountSmallFilesBootstrap = for {
Expand All @@ -95,6 +97,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
} else {
None
}
val executorInitContainerMountSecretsBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths))
} else {
None
}

if (maybeInitContainerConfigMap.isEmpty) {
logWarning("The executor's init-container config map was not specified. Executors will" +
Expand Down Expand Up @@ -133,6 +140,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
mountSecretBootstrap,
mountSmallFilesBootstrap,
executorInitContainerBootstrap,
executorInitContainerMountSecretsBootstrap,
executorInitContainerSecretVolumePlugin,
executorLocalDirVolumeProvider)
val allocatorExecutor = ThreadUtils
Expand Down
Loading

0 comments on commit 0612195

Please sign in to comment.