Skip to content

Commit

Permalink
[KYUUBI #6199] Support to run HiveSQLEngine on kerberized YARN
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

This pull request implement a feature -  Run HiveSQLEngine on kerberized YARN

## Describe Your Solution 🔧
Introduced two configs:
- kyuubi.engine.principal
- kyuubi.engine.keytab

When do submit to a kerberized YARN, submitter uploads `kyuubi.engine.keytab` to application's staging dir.
YARN NodeManager downloads keytab to AM's working directory. AM logins to Kerberos using the principal and keytab

**Note**
I've tried to run HiveSQLEngine with only DelegationTokens but failed.

Take SQL `SELECT * FROM a` as an example:
Hive handles this simple TableScan SQL by reading directly from table's hdfs file.
When Hive invokes `FileInputFormat.getSplits` during reading, `java.io.IOException: Delegation Token can be issued only with kerberos or web authentication` will be thrown.
The simplified stacktrace from IDEA is as below:
```
getDelegationToken:734, DFSClient (org.apache.hadoop.hdfs)
getDelegationToken:2072, DistributedFileSystem (org.apache.hadoop.hdfs)
collectDelegationTokens:108, DelegationTokenIssuer (org.apache.hadoop.security.token)
addDelegationTokens:83, DelegationTokenIssuer (org.apache.hadoop.security.token)
obtainTokensForNamenodesInternal:143, TokenCache (org.apache.hadoop.mapreduce.security)
obtainTokensForNamenodesInternal:102, TokenCache (org.apache.hadoop.mapreduce.security)
obtainTokensForNamenodes:81, TokenCache (org.apache.hadoop.mapreduce.security)
listStatus:221, FileInputFormat (org.apache.hadoop.mapred)
getSplits:332, FileInputFormat (org.apache.hadoop.mapred)
getNextSplits:372, FetchOperator (org.apache.hadoop.hive.ql.exec)
getRecordReader:304, FetchOperator (org.apache.hadoop.hive.ql.exec)
getNextRow:459, FetchOperator (org.apache.hadoop.hive.ql.exec)
pushRow:428, FetchOperator (org.apache.hadoop.hive.ql.exec)
fetch:147, FetchTask (org.apache.hadoop.hive.ql.exec)
getResults:2208, Driver (org.apache.hadoop.hive.ql)
getNextRowSet:494, SQLOperation (org.apache.hive.service.cli.operation)
getNextRowSetInternal:105, HiveOperation (org.apache.kyuubi.engine.hive.operation)
```

Theoretically, it can be solved by add AM DelegationTokens into
 `org.apache.hadoop.hive.ql.exec.FetchOperator.job.credentials`.
But actually, it is impossible without modifying Hive's source code.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️
HiveSQLEngine can not run on a kerberized YARN

#### Behavior With This Pull Request 🎉
HiveSQLEngine can run on a kerberized YARN

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6199 from zhouyifan279/kerberized-hive-engine-on-yarn.

Closes #6199

383d1cd [zhouyifan279] Fix tests
458493a [zhouyifan279] Warn if run Hive on YARN without principal and keytab
118afe2 [zhouyifan279] Warn if run Hive on YARN without principal and keytab
41fed0c [zhouyifan279] Ignore Principal&Keytab when hadoop security is no enabled.
9e2d862 [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
5ae0a3e [zhouyifan279] Remove redundant checks
5d3013a [zhouyifan279] Use principal & keytab in Local mode
5733dfd [zhouyifan279] Use principal & keytab in Local mode
85ce9bb [zhouyifan279] Use principal & keytab in Local mode
061223d [zhouyifan279] Resolve comments
e706936 [zhouyifan279] Resolve comments
f84c7bc [zhouyifan279] Support run HiveSQLEngine on kerberized YARN
4d262c8 [zhouyifan279] Support run HiveSQLEngine on kerberized YARN

Lead-authored-by: zhouyifan279 <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
zhouyifan279 and pan3793 committed Mar 22, 2024
1 parent e1861d9 commit a196ace
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 29 deletions.
2 changes: 2 additions & 0 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_HIVE_DEPLOY_MODE, ENGINE_KEYTAB, ENGINE_PRINCIPAL}
import org.apache.kyuubi.engine.deploy.DeployMode
import org.apache.kyuubi.engine.hive.HiveSQLEngine.currentEngine
import org.apache.kyuubi.engine.hive.events.{HiveEngineEvent, HiveEventHandlerRegister}
import org.apache.kyuubi.events.EventBus
Expand Down Expand Up @@ -133,26 +135,37 @@ object HiveSQLEngine extends Logging {
SignalRegister.registerLogger(logger)
try {
Utils.fromCommandLineArgs(args, kyuubiConf)
val sessionUser = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY)
val proxyUser = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY)
require(proxyUser.isDefined, s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY} is not set")
val realUser = UserGroupInformation.getLoginUser

if (sessionUser.isEmpty || sessionUser.get == realUser.getShortUserName) {
startEngine()
} else {
val effectiveUser = UserGroupInformation.createProxyUser(sessionUser.get, realUser)
effectiveUser.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
val engineCredentials =
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
engineCredentials.filter(_.nonEmpty).foreach { credentials =>
HiveTBinaryFrontendService.renewDelegationToken(credentials)
val principal = kyuubiConf.get(ENGINE_PRINCIPAL)
val keytab = kyuubiConf.get(ENGINE_KEYTAB)

val ugi = DeployMode.withName(kyuubiConf.get(ENGINE_HIVE_DEPLOY_MODE)) match {
case DeployMode.LOCAL
if UserGroupInformation.isSecurityEnabled && principal.isDefined && keytab.isDefined =>
UserGroupInformation.loginUserFromKeytab(principal.get, keytab.get)
UserGroupInformation.getCurrentUser
case DeployMode.LOCAL if proxyUser.get != realUser.getShortUserName =>
val newUGI = UserGroupInformation.createProxyUser(proxyUser.get, realUser)
newUGI.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
val engineCredentials =
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
engineCredentials.filter(_.nonEmpty).foreach { credentials =>
HiveTBinaryFrontendService.renewDelegationToken(credentials)
}
}
startEngine()
}
})
})
newUGI
case _ =>
UserGroupInformation.getCurrentUser
}

ugi.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = startEngine()
})
} catch {
case t: Throwable =>
currentEngine match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import java.io.File

import scala.collection.mutable.ListBuffer

import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_HIVE_EXTRA_CLASSPATH
import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter
import org.apache.kyuubi.engine.hive.HiveSQLEngine
Expand All @@ -29,7 +32,16 @@ object HiveYarnModeSubmitter extends EngineYarnModeSubmitter {

def main(args: Array[String]): Unit = {
Utils.fromCommandLineArgs(args, kyuubiConf)
submitApplication()

if (UserGroupInformation.isSecurityEnabled) {
require(
kyuubiConf.get(KyuubiConf.ENGINE_PRINCIPAL).isDefined
&& kyuubiConf.get(KyuubiConf.ENGINE_KEYTAB).isDefined,
s"${KyuubiConf.ENGINE_PRINCIPAL.key} and " +
s"${KyuubiConf.ENGINE_KEYTAB.key} must be set when submit " +
s"${HiveSQLEngine.getClass.getSimpleName.stripSuffix("$")} to YARN")
}
run()
}

override var engineType: String = "hive"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.commons.lang3.{JavaVersion, SystemUtils}

import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED
import org.apache.kyuubi.config.KyuubiReservedKeys
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.util.command.CommandLineUtils._
Expand All @@ -34,7 +35,9 @@ class HiveCatalogDatabaseOperationSuite extends HiveJDBCTestHelper {
CONF,
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true",
CONF,
s"${ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key}=true")
s"${ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key}=true",
CONF,
s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY}=kyuubi")
HiveSQLEngine.main(args)
super.beforeAll()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.hive.operation
import org.apache.commons.lang3.{JavaVersion, SystemUtils}

import org.apache.kyuubi.{HiveEngineTests, KYUUBI_VERSION, Utils}
import org.apache.kyuubi.config.KyuubiReservedKeys
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
import org.apache.kyuubi.util.command.CommandLineUtils._
Expand All @@ -31,7 +32,9 @@ class HiveOperationSuite extends HiveEngineTests {
metastore.toFile.delete()
val args = Array(
CONF,
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true")
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true",
CONF,
s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY}=kyuubi")
HiveSQLEngine.main(args)
super.beforeAll()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2836,6 +2836,20 @@ object KyuubiConf {
.stringConf
.createOptional

val ENGINE_PRINCIPAL: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.principal")
.doc("Kerberos principal for the kyuubi engine.")
.version("1.10.0")
.stringConf
.createOptional

val ENGINE_KEYTAB: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.keytab")
.doc("Kerberos keytab for the kyuubi engine.")
.version("1.10.0")
.stringConf
.createOptional

val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.flink.memory")
.doc("The heap memory for the Flink SQL engine. Only effective in yarn session mode.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@
package org.apache.kyuubi.engine.deploy.yarn

import java.io.{File, IOException}
import java.security.PrivilegedExceptionAction

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier

import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.KyuubiHadoopUtils
import org.apache.kyuubi.util.command.CommandLineUtils.confKeyValues
Expand Down Expand Up @@ -71,7 +74,43 @@ object ApplicationMaster extends Logging {
unregister(finalStatus, finalMsg)
}
})
runApplicationMaster()

val ugi = kyuubiConf.get(KyuubiConf.ENGINE_PRINCIPAL) match {
case Some(principalName) if UserGroupInformation.isSecurityEnabled =>
val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
val keytabFilename = kyuubiConf.get(KyuubiConf.ENGINE_KEYTAB).orNull
if (!new File(keytabFilename).exists()) {
throw new KyuubiException(s"Keytab file: $keytabFilename does not exist")
} else {
info("Attempting to login to Kerberos " +
s"using principal: $principalName and keytab: $keytabFilename")
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}

val newUGI = UserGroupInformation.getCurrentUser()

// Transfer YARN_AM_RM_TOKEN to the new user.
// Not transfer other tokens, such as HDFS_DELEGATION_TOKEN,
// to avoid "org.apache.hadoop.ipc.RemoteException(java.io.IOException):
// Delegation Token can be issued only with kerberos or web authentication"
// when engine tries to obtain new tokens.
KyuubiHadoopUtils.getTokenMap(originalCreds).values
.find(_.getKind == AMRMTokenIdentifier.KIND_NAME)
.foreach { token =>
newUGI.addToken(token)
}
newUGI
case _ =>
val appUser = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY)
require(appUser.isDefined, s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY} is not set")
val newUGI = UserGroupInformation.createRemoteUser(appUser.get)
newUGI.addCredentials(UserGroupInformation.getCurrentUser.getCredentials)
newUGI
}

ugi.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = runApplicationMaster()
})
} catch {
case t: Throwable =>
error("Error running ApplicationMaster", t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package org.apache.kyuubi.engine.deploy.yarn

import java.io._
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.security.PrivilegedExceptionAction
import java.util
import java.util.{Locale, Properties}
import java.util.{Locale, Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
Expand All @@ -30,7 +32,8 @@ import scala.collection.mutable.ListBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
Expand All @@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.util.Records
import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter._
import org.apache.kyuubi.util.KyuubiHadoopUtils

Expand Down Expand Up @@ -81,6 +85,9 @@ abstract class EngineYarnModeSubmitter extends Logging {

var yarnConf: Configuration = _
var hadoopConf: Configuration = _
var appUser: String = _
var keytab: String = _
var amKeytabFileName: Option[String] = _

var engineType: String

Expand All @@ -91,9 +98,35 @@ abstract class EngineYarnModeSubmitter extends Logging {
*/
def engineExtraJars(): Seq[File] = Seq.empty

protected def submitApplication(): Unit = {
def run(): Unit = {
yarnConf = KyuubiHadoopUtils.newYarnConfiguration(kyuubiConf)
hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf)
appUser = kyuubiConf.getOption(KYUUBI_SESSION_USER_KEY).orNull
require(appUser != null, s"$KYUUBI_SESSION_USER_KEY is not set")
keytab = kyuubiConf.get(ENGINE_KEYTAB).orNull
val principal = kyuubiConf.get(ENGINE_PRINCIPAL).orNull
amKeytabFileName =
if (UserGroupInformation.isSecurityEnabled && principal != null && keytab != null) {
info(s"Kerberos credentials: principal = $principal, keytab = $keytab")
UserGroupInformation.loginUserFromKeytab(principal, keytab)
// Generate a file name that can be used for the keytab file, that does not conflict
// with any user file.
Some(new File(keytab).getName() + "-" + UUID.randomUUID().toString)
} else {
None
}

val ugi = if (UserGroupInformation.getCurrentUser.getShortUserName == appUser) {
UserGroupInformation.getCurrentUser
} else {
UserGroupInformation.createProxyUser(appUser, UserGroupInformation.getCurrentUser)
}
ugi.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = submitApplication()
})
}

protected def submitApplication(): Unit = {
try {
yarnClient = YarnClient.createYarnClient()
yarnClient.init(yarnConf)
Expand Down Expand Up @@ -134,6 +167,29 @@ abstract class EngineYarnModeSubmitter extends Logging {
}
}

private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
if (UserGroupInformation.isSecurityEnabled) {
val credentials = obtainHadoopFsDelegationToken()
val serializedCreds = KyuubiHadoopUtils.serializeCredentials(credentials)
amContainer.setTokens(ByteBuffer.wrap(serializedCreds))
}
}

private def obtainHadoopFsDelegationToken(): Credentials = {
val tokenRenewer = Master.getMasterPrincipal(hadoopConf)
info(s"Delegation token renewer is: $tokenRenewer")

if (tokenRenewer == null || tokenRenewer.isEmpty) {
val errorMessage = "Can't get Master Kerberos principal for use as renewer."
error(errorMessage)
throw new KyuubiException(errorMessage)
}

val credentials = new Credentials()
FileSystem.get(hadoopConf).addDelegationTokens(tokenRenewer, credentials)
credentials
}

private def createContainerLaunchContext(): ContainerLaunchContext = {
info("Setting up container launch context for engine AM")
val env = setupLaunchEnv(kyuubiConf)
Expand Down Expand Up @@ -171,6 +227,7 @@ abstract class EngineYarnModeSubmitter extends Logging {
amContainer.setCommands(printableCommands.asJava)
info(s"Commands: ${printableCommands.mkString(" ")}")

setupSecurityToken(amContainer)
amContainer
}

Expand All @@ -187,6 +244,19 @@ abstract class EngineYarnModeSubmitter extends Logging {

distributeJars(localResources, env)
distributeConf(localResources, env)

// If we passed in a keytab, make sure we copy the keytab to the staging directory on
// HDFS, and setup the relevant environment vars, so the AM can login again.
amKeytabFileName.foreach { kt =>
info("To enable the AM to login from keytab, credentials are being copied over to the AM" +
" via the YARN Secure Distributed Cache.")
distribute(
new Path(new File(keytab).toURI),
LocalResourceType.FILE,
kt,
localResources)
}

localResources
}

Expand Down Expand Up @@ -253,6 +323,7 @@ abstract class EngineYarnModeSubmitter extends Logging {
listDistinctFiles(yarnConf.get).foreach(putEntry)

val properties = confToProperties(kyuubiConf)
amKeytabFileName.foreach(kt => properties.put(ENGINE_KEYTAB.key, kt))
writePropertiesToArchive(properties, KYUUBI_CONF_FILE, confStream)
} finally {
confStream.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ object KyuubiHadoopUtils extends Logging {
creds
}

def serializeCredentials(creds: Credentials): Array[Byte] = {
val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
creds.writeTokenStorageToStream(dataStream)
byteStream.toByteArray
}

def deserializeCredentials(tokenBytes: Array[Byte]): Credentials = {
val tokensBuf = new ByteArrayInputStream(tokenBytes)

val creds = new Credentials()
creds.readTokenStorageStream(new DataInputStream(tokensBuf))
creds
}

/**
* Get [[Credentials#tokenMap]] by reflection as [[Credentials#getTokenMap]] is not present before
* Hadoop 3.2.1.
Expand Down
Loading

0 comments on commit a196ace

Please sign in to comment.