diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index 8bb87603d3a..95c7e82a043 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -170,6 +170,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.engine.jdbc.operation.incremental.collect | false | When true, the result will be sequentially calculated and returned to the JDBC engine. It fallback to `kyuubi.operation.incremental.collect` | boolean | 1.10.0 | | kyuubi.engine.jdbc.session.initialize.sql || SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. | seq | 1.8.0 | | kyuubi.engine.jdbc.type | <undefined> | The short name of JDBC type | string | 1.6.0 | +| kyuubi.engine.keytab | <undefined> | Kerberos keytab for the kyuubi engine. | string | 1.10.0 | | kyuubi.engine.kubernetes.submit.timeout | PT30S | The engine submit timeout for Kubernetes application. | duration | 1.7.2 | | kyuubi.engine.operation.convert.catalog.database.enabled | true | When set to true, The engine converts the JDBC methods of set/get Catalog and set/get Schema to the implementation of different engines | boolean | 1.6.0 | | kyuubi.engine.operation.log.dir.root | engine_operation_logs | Root directory for query operation log at engine-side. | string | 1.4.0 | @@ -177,6 +178,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.engine.pool.selectPolicy | RANDOM | The select policy of an engine from the corresponding engine pool engine for a session. | string | 1.7.0 | | kyuubi.engine.pool.size | -1 | The size of the engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold). | int | 1.4.0 | | kyuubi.engine.pool.size.threshold | 9 | This parameter is introduced as a server-side parameter controlling the upper limit of the engine pool. | int | 1.4.0 | +| kyuubi.engine.principal | <undefined> | Kerberos principal for the kyuubi engine. | string | 1.10.0 | | kyuubi.engine.session.initialize.sql || SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.3.0 | | kyuubi.engine.share.level | USER | Engines will be shared in different levels, available configs are: See also `kyuubi.engine.share.level.subdomain` and `kyuubi.engine.doAs.enabled`. | string | 1.2.0 | | kyuubi.engine.share.level.sub.domain | <undefined> | (deprecated) - Using kyuubi.engine.share.level.subdomain instead | string | 1.2.0 | diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala index 4e0787039bc..24876456c62 100644 --- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala +++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala @@ -27,6 +27,8 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_HIVE_DEPLOY_MODE, ENGINE_KEYTAB, ENGINE_PRINCIPAL} +import org.apache.kyuubi.engine.deploy.DeployMode import org.apache.kyuubi.engine.hive.HiveSQLEngine.currentEngine import org.apache.kyuubi.engine.hive.events.{HiveEngineEvent, HiveEventHandlerRegister} import org.apache.kyuubi.events.EventBus @@ -133,26 +135,37 @@ object HiveSQLEngine extends Logging { SignalRegister.registerLogger(logger) try { Utils.fromCommandLineArgs(args, kyuubiConf) - val sessionUser = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY) + val proxyUser = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY) + require(proxyUser.isDefined, s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY} is not set") val realUser = UserGroupInformation.getLoginUser - - if (sessionUser.isEmpty || sessionUser.get == realUser.getShortUserName) { - startEngine() - } else { - val effectiveUser = UserGroupInformation.createProxyUser(sessionUser.get, realUser) - effectiveUser.doAs(new PrivilegedExceptionAction[Unit] { - override def run(): Unit = { - val engineCredentials = - kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY) - kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY) - engineCredentials.filter(_.nonEmpty).foreach { credentials => - HiveTBinaryFrontendService.renewDelegationToken(credentials) + val principal = kyuubiConf.get(ENGINE_PRINCIPAL) + val keytab = kyuubiConf.get(ENGINE_KEYTAB) + + val ugi = DeployMode.withName(kyuubiConf.get(ENGINE_HIVE_DEPLOY_MODE)) match { + case DeployMode.LOCAL + if UserGroupInformation.isSecurityEnabled && principal.isDefined && keytab.isDefined => + UserGroupInformation.loginUserFromKeytab(principal.get, keytab.get) + UserGroupInformation.getCurrentUser + case DeployMode.LOCAL if proxyUser.get != realUser.getShortUserName => + val newUGI = UserGroupInformation.createProxyUser(proxyUser.get, realUser) + newUGI.doAs(new PrivilegedExceptionAction[Unit] { + override def run(): Unit = { + val engineCredentials = + kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY) + kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY) + engineCredentials.filter(_.nonEmpty).foreach { credentials => + HiveTBinaryFrontendService.renewDelegationToken(credentials) + } } - startEngine() - } - }) + }) + newUGI + case _ => + UserGroupInformation.getCurrentUser } + ugi.doAs(new PrivilegedExceptionAction[Unit] { + override def run(): Unit = startEngine() + }) } catch { case t: Throwable => currentEngine match { diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitter.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitter.scala index 9d5126ad668..a717f9dfd01 100644 --- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitter.scala +++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitter.scala @@ -20,7 +20,10 @@ import java.io.File import scala.collection.mutable.ListBuffer +import org.apache.hadoop.security.UserGroupInformation + import org.apache.kyuubi.Utils +import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.ENGINE_HIVE_EXTRA_CLASSPATH import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter import org.apache.kyuubi.engine.hive.HiveSQLEngine @@ -29,7 +32,16 @@ object HiveYarnModeSubmitter extends EngineYarnModeSubmitter { def main(args: Array[String]): Unit = { Utils.fromCommandLineArgs(args, kyuubiConf) - submitApplication() + + if (UserGroupInformation.isSecurityEnabled) { + require( + kyuubiConf.get(KyuubiConf.ENGINE_PRINCIPAL).isDefined + && kyuubiConf.get(KyuubiConf.ENGINE_KEYTAB).isDefined, + s"${KyuubiConf.ENGINE_PRINCIPAL.key} and " + + s"${KyuubiConf.ENGINE_KEYTAB.key} must be set when submit " + + s"${HiveSQLEngine.getClass.getSimpleName.stripSuffix("$")} to YARN") + } + run() } override var engineType: String = "hive" diff --git a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala index 7db2d7fdca3..aa4d77d11b8 100644 --- a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala +++ b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala @@ -21,6 +21,7 @@ import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.kyuubi.Utils import org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED +import org.apache.kyuubi.config.KyuubiReservedKeys import org.apache.kyuubi.engine.hive.HiveSQLEngine import org.apache.kyuubi.operation.HiveJDBCTestHelper import org.apache.kyuubi.util.command.CommandLineUtils._ @@ -34,7 +35,9 @@ class HiveCatalogDatabaseOperationSuite extends HiveJDBCTestHelper { CONF, s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true", CONF, - s"${ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key}=true") + s"${ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key}=true", + CONF, + s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY}=kyuubi") HiveSQLEngine.main(args) super.beforeAll() } diff --git a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala index 53cc9457ae1..8150bea32c3 100644 --- a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala +++ b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.hive.operation import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.kyuubi.{HiveEngineTests, KYUUBI_VERSION, Utils} +import org.apache.kyuubi.config.KyuubiReservedKeys import org.apache.kyuubi.engine.hive.HiveSQLEngine import org.apache.kyuubi.jdbc.hive.KyuubiStatement import org.apache.kyuubi.util.command.CommandLineUtils._ @@ -31,7 +32,9 @@ class HiveOperationSuite extends HiveEngineTests { metastore.toFile.delete() val args = Array( CONF, - s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true") + s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true", + CONF, + s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY}=kyuubi") HiveSQLEngine.main(args) super.beforeAll() } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index da8b3395ac1..78d440a3170 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -2836,6 +2836,20 @@ object KyuubiConf { .stringConf .createOptional + val ENGINE_PRINCIPAL: OptionalConfigEntry[String] = + buildConf("kyuubi.engine.principal") + .doc("Kerberos principal for the kyuubi engine.") + .version("1.10.0") + .stringConf + .createOptional + + val ENGINE_KEYTAB: OptionalConfigEntry[String] = + buildConf("kyuubi.engine.keytab") + .doc("Kerberos keytab for the kyuubi engine.") + .version("1.10.0") + .stringConf + .createOptional + val ENGINE_FLINK_MEMORY: ConfigEntry[String] = buildConf("kyuubi.engine.flink.memory") .doc("The heap memory for the Flink SQL engine. Only effective in yarn session mode.") diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala index 3e396beb070..1993009adce 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala @@ -17,17 +17,20 @@ package org.apache.kyuubi.engine.deploy.yarn import java.io.{File, IOException} +import java.security.PrivilegedExceptionAction import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.fs.Path +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.records.FinalApplicationStatus import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier -import org.apache.kyuubi.{Logging, Utils} -import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.{KyuubiException, Logging, Utils} +import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} import org.apache.kyuubi.service.Serverable import org.apache.kyuubi.util.KyuubiHadoopUtils import org.apache.kyuubi.util.command.CommandLineUtils.confKeyValues @@ -71,7 +74,43 @@ object ApplicationMaster extends Logging { unregister(finalStatus, finalMsg) } }) - runApplicationMaster() + + val ugi = kyuubiConf.get(KyuubiConf.ENGINE_PRINCIPAL) match { + case Some(principalName) if UserGroupInformation.isSecurityEnabled => + val originalCreds = UserGroupInformation.getCurrentUser().getCredentials() + val keytabFilename = kyuubiConf.get(KyuubiConf.ENGINE_KEYTAB).orNull + if (!new File(keytabFilename).exists()) { + throw new KyuubiException(s"Keytab file: $keytabFilename does not exist") + } else { + info("Attempting to login to Kerberos " + + s"using principal: $principalName and keytab: $keytabFilename") + UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) + } + + val newUGI = UserGroupInformation.getCurrentUser() + + // Transfer YARN_AM_RM_TOKEN to the new user. + // Not transfer other tokens, such as HDFS_DELEGATION_TOKEN, + // to avoid "org.apache.hadoop.ipc.RemoteException(java.io.IOException): + // Delegation Token can be issued only with kerberos or web authentication" + // when engine tries to obtain new tokens. + KyuubiHadoopUtils.getTokenMap(originalCreds).values + .find(_.getKind == AMRMTokenIdentifier.KIND_NAME) + .foreach { token => + newUGI.addToken(token) + } + newUGI + case _ => + val appUser = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY) + require(appUser.isDefined, s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY} is not set") + val newUGI = UserGroupInformation.createRemoteUser(appUser.get) + newUGI.addCredentials(UserGroupInformation.getCurrentUser.getCredentials) + newUGI + } + + ugi.doAs(new PrivilegedExceptionAction[Unit] { + override def run(): Unit = runApplicationMaster() + }) } catch { case t: Throwable => error("Error running ApplicationMaster", t) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala index 552a3158f3d..e2913539b46 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala @@ -17,10 +17,12 @@ package org.apache.kyuubi.engine.deploy.yarn import java.io._ +import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import java.nio.file.Files +import java.security.PrivilegedExceptionAction import java.util -import java.util.{Locale, Properties} +import java.util.{Locale, Properties, UUID} import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ @@ -30,7 +32,8 @@ import scala.collection.mutable.ListBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.mapred.Master +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.records._ @@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.util.Records import org.apache.kyuubi.{KyuubiException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter._ import org.apache.kyuubi.util.KyuubiHadoopUtils @@ -81,6 +85,9 @@ abstract class EngineYarnModeSubmitter extends Logging { var yarnConf: Configuration = _ var hadoopConf: Configuration = _ + var appUser: String = _ + var keytab: String = _ + var amKeytabFileName: Option[String] = _ var engineType: String @@ -91,9 +98,35 @@ abstract class EngineYarnModeSubmitter extends Logging { */ def engineExtraJars(): Seq[File] = Seq.empty - protected def submitApplication(): Unit = { + def run(): Unit = { yarnConf = KyuubiHadoopUtils.newYarnConfiguration(kyuubiConf) hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf) + appUser = kyuubiConf.getOption(KYUUBI_SESSION_USER_KEY).orNull + require(appUser != null, s"$KYUUBI_SESSION_USER_KEY is not set") + keytab = kyuubiConf.get(ENGINE_KEYTAB).orNull + val principal = kyuubiConf.get(ENGINE_PRINCIPAL).orNull + amKeytabFileName = + if (UserGroupInformation.isSecurityEnabled && principal != null && keytab != null) { + info(s"Kerberos credentials: principal = $principal, keytab = $keytab") + UserGroupInformation.loginUserFromKeytab(principal, keytab) + // Generate a file name that can be used for the keytab file, that does not conflict + // with any user file. + Some(new File(keytab).getName() + "-" + UUID.randomUUID().toString) + } else { + None + } + + val ugi = if (UserGroupInformation.getCurrentUser.getShortUserName == appUser) { + UserGroupInformation.getCurrentUser + } else { + UserGroupInformation.createProxyUser(appUser, UserGroupInformation.getCurrentUser) + } + ugi.doAs(new PrivilegedExceptionAction[Unit] { + override def run(): Unit = submitApplication() + }) + } + + protected def submitApplication(): Unit = { try { yarnClient = YarnClient.createYarnClient() yarnClient.init(yarnConf) @@ -134,6 +167,29 @@ abstract class EngineYarnModeSubmitter extends Logging { } } + private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = { + if (UserGroupInformation.isSecurityEnabled) { + val credentials = obtainHadoopFsDelegationToken() + val serializedCreds = KyuubiHadoopUtils.serializeCredentials(credentials) + amContainer.setTokens(ByteBuffer.wrap(serializedCreds)) + } + } + + private def obtainHadoopFsDelegationToken(): Credentials = { + val tokenRenewer = Master.getMasterPrincipal(hadoopConf) + info(s"Delegation token renewer is: $tokenRenewer") + + if (tokenRenewer == null || tokenRenewer.isEmpty) { + val errorMessage = "Can't get Master Kerberos principal for use as renewer." + error(errorMessage) + throw new KyuubiException(errorMessage) + } + + val credentials = new Credentials() + FileSystem.get(hadoopConf).addDelegationTokens(tokenRenewer, credentials) + credentials + } + private def createContainerLaunchContext(): ContainerLaunchContext = { info("Setting up container launch context for engine AM") val env = setupLaunchEnv(kyuubiConf) @@ -171,6 +227,7 @@ abstract class EngineYarnModeSubmitter extends Logging { amContainer.setCommands(printableCommands.asJava) info(s"Commands: ${printableCommands.mkString(" ")}") + setupSecurityToken(amContainer) amContainer } @@ -187,6 +244,19 @@ abstract class EngineYarnModeSubmitter extends Logging { distributeJars(localResources, env) distributeConf(localResources, env) + + // If we passed in a keytab, make sure we copy the keytab to the staging directory on + // HDFS, and setup the relevant environment vars, so the AM can login again. + amKeytabFileName.foreach { kt => + info("To enable the AM to login from keytab, credentials are being copied over to the AM" + + " via the YARN Secure Distributed Cache.") + distribute( + new Path(new File(keytab).toURI), + LocalResourceType.FILE, + kt, + localResources) + } + localResources } @@ -253,6 +323,7 @@ abstract class EngineYarnModeSubmitter extends Logging { listDistinctFiles(yarnConf.get).foreach(putEntry) val properties = confToProperties(kyuubiConf) + amKeytabFileName.foreach(kt => properties.put(ENGINE_KEYTAB.key, kt)) writePropertiesToArchive(properties, KYUUBI_CONF_FILE, confStream) } finally { confStream.close() diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala index 2d9ea4a8ad5..ad083f17362 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala @@ -72,6 +72,21 @@ object KyuubiHadoopUtils extends Logging { creds } + def serializeCredentials(creds: Credentials): Array[Byte] = { + val byteStream = new ByteArrayOutputStream + val dataStream = new DataOutputStream(byteStream) + creds.writeTokenStorageToStream(dataStream) + byteStream.toByteArray + } + + def deserializeCredentials(tokenBytes: Array[Byte]): Credentials = { + val tokensBuf = new ByteArrayInputStream(tokenBytes) + + val creds = new Credentials() + creds.readTokenStorageStream(new DataInputStream(tokensBuf)) + creds + } + /** * Get [[Credentials#tokenMap]] by reflection as [[Credentials#getTokenMap]] is not present before * Hadoop 3.2.1. diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala index 903e06575cc..da035fcf97d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala @@ -23,10 +23,11 @@ import java.nio.file.{Files, Paths} import scala.collection.mutable import com.google.common.annotations.VisibleForTesting +import org.apache.hadoop.security.UserGroupInformation import org.apache.kyuubi._ import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.config.KyuubiConf.{ENGINE_DEPLOY_YARN_MODE_APP_NAME, ENGINE_HIVE_DEPLOY_MODE, ENGINE_HIVE_EXTRA_CLASSPATH, ENGINE_HIVE_JAVA_OPTIONS, ENGINE_HIVE_MEMORY} +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_DEPLOY_YARN_MODE_APP_NAME, ENGINE_HIVE_DEPLOY_MODE, ENGINE_HIVE_EXTRA_CLASSPATH, ENGINE_HIVE_JAVA_OPTIONS, ENGINE_HIVE_MEMORY, ENGINE_KEYTAB, ENGINE_PRINCIPAL} import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID, KYUUBI_SESSION_USER_KEY} import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder} import org.apache.kyuubi.engine.deploy.DeployMode @@ -121,19 +122,49 @@ object HiveProcessBuilder extends Logging { final val HIVE_ENGINE_NAME = "hive.engine.name" def apply( - appUser: String, + proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf, engineRefId: String, extraEngineLog: Option[OperationLog], defaultEngineName: String): HiveProcessBuilder = { + checkKeytab(proxyUser, conf) DeployMode.withName(conf.get(ENGINE_HIVE_DEPLOY_MODE)) match { - case LOCAL => new HiveProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) + case LOCAL => + new HiveProcessBuilder(proxyUser, doAsEnabled, conf, engineRefId, extraEngineLog) case YARN => warn(s"Hive on YARN model is experimental.") conf.setIfMissing(ENGINE_DEPLOY_YARN_MODE_APP_NAME, Some(defaultEngineName)) - new HiveYarnModeProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) + new HiveYarnModeProcessBuilder(proxyUser, doAsEnabled, conf, engineRefId, extraEngineLog) case other => throw new KyuubiException(s"Unsupported deploy mode: $other") } } + + private def checkKeytab(proxyUser: String, conf: KyuubiConf): Unit = { + val principal = conf.get(ENGINE_PRINCIPAL) + val keytab = conf.get(ENGINE_KEYTAB) + if (!UserGroupInformation.isSecurityEnabled) { + if (principal.isDefined || keytab.isDefined) { + warn("Principal and keytab takes no effect when hadoop security is not enabled.") + } + return + } + + require( + principal.isDefined == keytab.isDefined, + s"Both principal and keytab must be defined, or neither.") + if (principal.isDefined && keytab.isDefined) { + val ugi = UserGroupInformation + .loginUserFromKeytabAndReturnUGI(principal.get, keytab.get) + require( + ugi.getShortUserName == proxyUser, + s"Proxy user: $proxyUser is not same with " + + s"engine principal: ${ugi.getShortUserName}.") + } + + val deployMode = DeployMode.withName(conf.get(ENGINE_HIVE_DEPLOY_MODE)) + if (principal.isEmpty && keytab.isEmpty && deployMode == YARN) { + warn("Hive on YARN can not work properly without principal and keytab.") + } + } }