Skip to content

Commit

Permalink
[KYUUBI #6003] Allow disabling user impersonation on launching engine
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

HiveServer2 has a configuration `hive.server2.enable.doAs` to control the execution user between the session user and the server user, Kyuubi's CONNECTION and USER share levels always perform like doAs enabled do. In CDH 5/6, this is disabled by default, users who want to migrate from CDH to Kyuubi may encounter permission issues with the current implementation.

## Describe Your Solution 🔧

This pull request introduces a new configuration `kyuubi.engine.doAs.enabled` to allow enable/disable user impersonation on launching engine. For security purpose, it's not allowed to be overridden by session conf.

The change in this PR has certain limitations:

- only supports Spark engine
- only supports interactive mode, specifically, it does not take effect on Spark batch mode now.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

The first step is passing all existing UTs when `kyuubi.engine.doAs.enabled=true`.

Tested on internal Kerberized-environment, when `kyuubi.engine.share.level=CONNECTION` and `kyuubi.engine.doAs.enabled=false`, use user 'spark' to launch engine, and the engine submitted without `--proxy-user spark`, thus engine launched by server user `hive`, then run `select session_user(), current_user()` and returns

```
+-----------------+-----------------+
| session_user()  | current_user()  |
+-----------------+-----------------+
| spark           | hive            |
+-----------------+-----------------+
```

And I checked the `spark.app.name` and registered path on Zookeeper also expected.
```
+-----------------+--------------------------------------------------------------------------+
|       key       |                       value                                              |
+-----------------+--------------------------------------------------------------------------+
| spark.app.name  | kyuubi_USER_SPARK_SQL_spark_default_51a416e5-6023-4bac-a964-cd9605f17c61 |
+-----------------+--------------------------------------------------------------------------+
```

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6003 from pan3793/doas.

Closes #6003

c4002fe [Cheng Pan] grammar
add20fd [Cheng Pan] nit
8711c22 [Cheng Pan] address comment
033a322 [Cheng Pan] 1.9.0
9273b94 [Cheng Pan] fix
a1563e1 [Cheng Pan] HadoopCredentialsManager
e982e23 [Cheng Pan] Allow disable user impersonation on launching engine

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
pan3793 committed Jan 29, 2024
1 parent d474768 commit 3f993f4
Show file tree
Hide file tree
Showing 31 changed files with 355 additions and 238 deletions.
191 changes: 96 additions & 95 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
test("Check if spark.kubernetes.executor.podNamePrefix is invalid") {
Seq("_123", "spark_exec", "spark@", "a" * 238).foreach { invalid =>
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalid)
val builder = new SparkProcessBuilder("test", conf)
val builder = new SparkProcessBuilder("test", true, conf)
val e = intercept[KyuubiException](builder.validateConf)
assert(e.getMessage === s"'$invalid' in spark.kubernetes.executor.podNamePrefix is" +
s" invalid. must conform https://kubernetes.io/docs/concepts/overview/" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2087,20 +2087,33 @@ object KyuubiConf {
.version("1.5.0")
.fallbackConf(ENGINE_CONNECTION_URL_USE_HOSTNAME)

val ENGINE_DO_AS_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.doAs.enabled")
.doc("Whether to enable user impersonation on launching engine. When enabled, " +
"for engines which supports user impersonation, e.g. SPARK, depends on the " +
s"`kyuubi.engine.share.level`, different users will be used to launch the engine. " +
"Otherwise, Kyuubi Server's user will always be used to launch the engine.")
.version("1.9.0")
.booleanConf
.createWithDefault(true)

val ENGINE_SHARE_LEVEL: ConfigEntry[String] = buildConf("kyuubi.engine.share.level")
.doc("Engines will be shared in different levels, available configs are: <ul>" +
" <li>CONNECTION: engine will not be shared but only used by the current client" +
" connection</li>" +
" <li>USER: engine will be shared by all sessions created by a unique username," +
s" see also ${ENGINE_SHARE_LEVEL_SUBDOMAIN.key}</li>" +
" <li>CONNECTION: the engine will not be shared but only used by the current client" +
" connection, and the engine will be launched by session user.</li>" +
" <li>USER: the engine will be shared by all sessions created by a unique username," +
s" and the engine will be launched by session user.</li>" +
" <li>GROUP: the engine will be shared by all sessions created" +
" by all users belong to the same primary group name." +
" The engine will be launched by the group name as the effective" +
" The engine will be launched by the primary group name as the effective" +
" username, so here the group name is in value of special user who is able to visit the" +
" computing resources/data of the team. It follows the" +
" [Hadoop GroupsMapping](https://reurl.cc/xE61Y5) to map user to a primary group. If the" +
" primary group is not found, it fallback to the USER level." +
" <li>SERVER: the App will be shared by Kyuubi servers</li></ul>")
" <li>SERVER: the engine will be shared by Kyuubi servers, and the engine will be launched" +
" by Server's user.</li>" +
" </ul>" +
s" See also `${ENGINE_SHARE_LEVEL_SUBDOMAIN.key}` and `${ENGINE_DO_AS_ENABLED.key}`.")
.version("1.2.0")
.fallbackConf(LEGACY_ENGINE_SHARE_LEVEL)

Expand All @@ -2115,8 +2128,8 @@ object KyuubiConf {
" all the capacity of the Trino.</li>" +
" <li>HIVE_SQL: specify this engine type will launch a Hive engine which can provide" +
" all the capacity of the Hive Server2.</li>" +
" <li>JDBC: specify this engine type will launch a JDBC engine which can forward " +
" queries to the database system through the certain JDBC driver, " +
" <li>JDBC: specify this engine type will launch a JDBC engine which can forward" +
" queries to the database system through the certain JDBC driver," +
" for now, it supports Doris, MySQL, Phoenix, PostgreSQL and StarRocks.</li>" +
" <li>CHAT: specify this engine type will launch a Chat engine.</li>" +
"</ul>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ import org.apache.kyuubi.server.KyuubiServer
* The description and functionality of an engine at server side
*
* @param conf Engine configuration
* @param user Caller of the engine
* @param sessionUser Caller of the engine
* @param engineRefId Id of the corresponding session in which the engine is created
*/
private[kyuubi] class EngineRef(
conf: KyuubiConf,
user: String,
sessionUser: String,
doAsEnabled: Boolean,
groupProvider: GroupProvider,
engineRefId: String,
engineManager: KyuubiApplicationManager,
Expand Down Expand Up @@ -88,15 +89,18 @@ private[kyuubi] class EngineRef(

private var builder: ProcBuilder = _

private[kyuubi] def getEngineRefId(): String = engineRefId
private[kyuubi] def getEngineRefId: String = engineRefId

// Launcher of the engine
private[kyuubi] val appUser: String = shareLevel match {
// user for routing session to the engine
private[kyuubi] val routingUser: String = shareLevel match {
case SERVER => Utils.currentUser
case GROUP => groupProvider.primaryGroup(user, conf.getAll.asJava)
case _ => user
case GROUP => groupProvider.primaryGroup(sessionUser, conf.getAll.asJava)
case _ => sessionUser
}

// user for launching engine
private[kyuubi] val appUser: String = if (doAsEnabled) routingUser else Utils.currentUser

@VisibleForTesting
private[kyuubi] val subdomain: String = conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) match {
case subdomain if clientPoolSize > 0 && (subdomain.isEmpty || enginePoolIgnoreSubdomain) =>
Expand All @@ -110,7 +114,7 @@ private[kyuubi] class EngineRef(
val snPath =
DiscoveryPaths.makePath(
s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}_seqNum",
appUser,
routingUser,
clientPoolName)
DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
client.getAndIncrement(snPath)
Expand All @@ -128,7 +132,7 @@ private[kyuubi] class EngineRef(
*/
@VisibleForTesting
private[kyuubi] val defaultEngineName: String = {
val commonNamePrefix = s"kyuubi_${shareLevel}_${engineType}_${appUser}"
val commonNamePrefix = s"kyuubi_${shareLevel}_${engineType}_${routingUser}"
shareLevel match {
case CONNECTION => s"${commonNamePrefix}_$engineRefId"
case _ => s"${commonNamePrefix}_${subdomain}_$engineRefId"
Expand All @@ -151,8 +155,8 @@ private[kyuubi] class EngineRef(
private[kyuubi] lazy val engineSpace: String = {
val commonParent = s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType"
shareLevel match {
case CONNECTION => DiscoveryPaths.makePath(commonParent, appUser, engineRefId)
case _ => DiscoveryPaths.makePath(commonParent, appUser, subdomain)
case CONNECTION => DiscoveryPaths.makePath(commonParent, routingUser, engineRefId)
case _ => DiscoveryPaths.makePath(commonParent, routingUser, subdomain)
}
}

Expand All @@ -167,7 +171,7 @@ private[kyuubi] class EngineRef(
val lockPath =
DiscoveryPaths.makePath(
s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}_lock",
appUser,
routingUser,
subdomain)
discoveryClient.tryWithLock(
lockPath,
Expand All @@ -188,19 +192,25 @@ private[kyuubi] class EngineRef(
builder = engineType match {
case SPARK_SQL =>
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
new SparkProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
new SparkProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case FLINK_SQL =>
conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
new FlinkProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
new FlinkProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case TRINO =>
new TrinoProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
new TrinoProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case HIVE_SQL =>
conf.setIfMissing(HiveProcessBuilder.HIVE_ENGINE_NAME, defaultEngineName)
HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog, defaultEngineName)
HiveProcessBuilder(
appUser,
doAsEnabled,
conf,
engineRefId,
extraEngineLog,
defaultEngineName)
case JDBC =>
new JdbcProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
new JdbcProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case CHAT =>
new ChatProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
new ChatProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
}

MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
Expand All @@ -222,7 +232,7 @@ private[kyuubi] class EngineRef(
while (engineRef.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
exitValue = Some(process.exitValue())
if (exitValue != Some(0)) {
if (!exitValue.contains(0)) {
val error = builder.getError
MetricsSystem.tracing { ms =>
ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser))
Expand All @@ -246,7 +256,7 @@ private[kyuubi] class EngineRef(

// even the submit process succeeds, the application might meet failure when initializing,
// check the engine application state from engine manager and fast fail on engine terminate
if (engineRef.isEmpty && exitValue == Some(0)) {
if (engineRef.isEmpty && exitValue.contains(0)) {
Option(engineManager).foreach { engineMgr =>
if (lastApplicationInfo.isDefined) {
TimeUnit.SECONDS.sleep(1)
Expand Down Expand Up @@ -341,7 +351,7 @@ private[kyuubi] class EngineRef(
discoveryClient: DiscoveryClient,
hostPort: (String, Int)): Option[ServiceNodeInfo] = {
val serviceNodes = discoveryClient.getServiceNodesInfo(engineSpace)
serviceNodes.filter { sn => (sn.host, sn.port) == hostPort }.headOption
serviceNodes.find { sn => (sn.host, sn.port) == hostPort }
}

def close(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ trait ProcBuilder {

protected def proxyUser: String

protected def doAsEnabled: Boolean

protected val commands: Iterable[String]

def conf: KyuubiConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ import org.apache.kyuubi.util.command.CommandLineUtils._

class ChatProcessBuilder(
override val proxyUser: String,
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
val engineRefId: String,
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {

@VisibleForTesting
def this(proxyUser: String, conf: KyuubiConf) {
this(proxyUser, conf, "")
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
this(proxyUser, doAsEnabled, conf, "")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ import org.apache.kyuubi.util.command.CommandLineUtils._
*/
class FlinkProcessBuilder(
override val proxyUser: String,
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
val engineRefId: String,
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {

@VisibleForTesting
def this(proxyUser: String, conf: KyuubiConf) {
this(proxyUser, conf, "")
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
this(proxyUser, doAsEnabled, conf, "")
}

val flinkHome: String = getEngineHome(shortName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ import org.apache.kyuubi.util.command.CommandLineUtils._

class HiveProcessBuilder(
override val proxyUser: String,
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
val engineRefId: String,
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {

@VisibleForTesting
def this(proxyUser: String, conf: KyuubiConf) {
this(proxyUser, conf, "")
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
this(proxyUser, doAsEnabled, conf, "")
}

protected val hiveHome: String = getEngineHome(shortName)
Expand Down Expand Up @@ -121,16 +122,17 @@ object HiveProcessBuilder extends Logging {

def apply(
appUser: String,
doAsEnabled: Boolean,
conf: KyuubiConf,
engineRefId: String,
extraEngineLog: Option[OperationLog],
defaultEngineName: String): HiveProcessBuilder = {
DeployMode.withName(conf.get(ENGINE_HIVE_DEPLOY_MODE)) match {
case LOCAL => new HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
case LOCAL => new HiveProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case YARN =>
warn(s"Hive on YARN model is experimental.")
conf.setIfMissing(ENGINE_DEPLOY_YARN_MODE_APP_NAME, Some(defaultEngineName))
new HiveYarnModeProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
new HiveYarnModeProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case other => throw new KyuubiException(s"Unsupported deploy mode: $other")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ import org.apache.kyuubi.util.command.CommandLineUtils.{confKeyValue, confKeyVal
*/
class HiveYarnModeProcessBuilder(
override val proxyUser: String,
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
override val engineRefId: String,
override val extraEngineLog: Option[OperationLog] = None)
extends HiveProcessBuilder(proxyUser, conf, engineRefId, extraEngineLog) with Logging {
extends HiveProcessBuilder(proxyUser, doAsEnabled, conf, engineRefId, extraEngineLog)
with Logging {

override protected def mainClass: String =
"org.apache.kyuubi.engine.hive.deploy.HiveYarnModeSubmitter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ import org.apache.kyuubi.util.command.CommandLineUtils._

class JdbcProcessBuilder(
override val proxyUser: String,
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
val engineRefId: String,
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {

@VisibleForTesting
def this(proxyUser: String, conf: KyuubiConf) {
this(proxyUser, conf, "")
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
this(proxyUser, doAsEnabled, conf, "")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class SparkBatchProcessBuilder(
batchConf: Map[String, String],
batchArgs: Seq[String],
override val extraEngineLog: Option[OperationLog])
extends SparkProcessBuilder(proxyUser, conf, batchId, extraEngineLog) {
// TODO respect doAsEnabled
extends SparkProcessBuilder(proxyUser, true, conf, batchId, extraEngineLog) {
import SparkProcessBuilder._

override protected lazy val commands: Iterable[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,23 @@ import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManage
import org.apache.kyuubi.engine.KubernetesApplicationOperation.{KUBERNETES_SERVICE_HOST, KUBERNETES_SERVICE_PORT}
import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.{KubernetesUtils, Validator}
import org.apache.kyuubi.util.command.CommandLineUtils._

class SparkProcessBuilder(
override val proxyUser: String,
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
val engineRefId: String,
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {

@VisibleForTesting
def this(proxyUser: String, conf: KyuubiConf) {
this(proxyUser, conf, "")
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
this(proxyUser, doAsEnabled, conf, "")
}

import SparkProcessBuilder._
Expand Down Expand Up @@ -135,14 +137,12 @@ class SparkProcessBuilder(
var allConf = conf.getAll

// if enable sasl kerberos authentication for zookeeper, need to upload the server keytab file
if (AuthTypes.withName(conf.get(HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE))
== AuthTypes.KERBEROS) {
if (AuthTypes.withName(conf.get(HA_ZK_ENGINE_AUTH_TYPE)) == AuthTypes.KERBEROS) {
allConf = allConf ++ zkAuthKeytabFileConf(allConf)
}
// pass spark engine log path to spark conf
(allConf ++ engineLogPathConf ++ extraYarnConf(allConf) ++ appendPodNameConf(allConf)).foreach {
case (k, v) =>
buffer ++= confKeyValue(convertConfigKey(k), v)
case (k, v) => buffer ++= confKeyValue(convertConfigKey(k), v)
}

setupKerberos(buffer)
Expand All @@ -157,10 +157,12 @@ class SparkProcessBuilder(
protected def setupKerberos(buffer: mutable.Buffer[String]): Unit = {
// if the keytab is specified, PROXY_USER is not supported
tryKeytab() match {
case None =>
case None if doAsEnabled =>
setSparkUserName(proxyUser, buffer)
buffer += PROXY_USER
buffer += proxyUser
case None => // doAs disabled
setSparkUserName(Utils.currentUser, buffer)
case Some(name) =>
setSparkUserName(name, buffer)
}
Expand All @@ -175,10 +177,15 @@ class SparkProcessBuilder(
try {
val ugi = UserGroupInformation
.loginUserFromKeytabAndReturnUGI(principal.get, keytab.get)
if (ugi.getShortUserName != proxyUser) {
if (doAsEnabled && ugi.getShortUserName != proxyUser) {
warn(s"The session proxy user: $proxyUser is not same with " +
s"spark principal: ${ugi.getShortUserName}, so we can't support use keytab. " +
s"Fallback to use proxy user.")
s"spark principal: ${ugi.getShortUserName}, skip using keytab. " +
"Fallback to use proxy user.")
None
} else if (!doAsEnabled && ugi.getShortUserName != Utils.currentUser) {
warn(s"The server's user: ${Utils.currentUser} is not same with " +
s"spark principal: ${ugi.getShortUserName}, skip using keytab. " +
"Fallback to use server's user.")
None
} else {
Some(ugi.getShortUserName)
Expand Down
Loading

0 comments on commit 3f993f4

Please sign in to comment.