[KYUUBI #5306] YarnApplicationOperation supports proxy user

### _Why are the changes needed?_

For the secured YARN cluster, the Kyuubi Server's user typically has no permission to kill the application. Proxy user or admin should be used instead.

https://docs.cloudera.com/documentation/enterprise/latest/topics/cm_mc_yarn_acl.html#concept_yarn_app_acls__section_killing_an_app

> For YARN, the following three groups of users are allowed to kill a running application:
> - The application owner
> - A cluster administrator defined in yarn.admin.acl
> - A queue administrator defined in aclAdministerApps for the queue in which the application is running

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

Verified ADMIN mode in internal deployment. (output message is formatted for readable)
```
Error: Batch e351185f-1ed8-437a-91bf-da2174e611e2 failed:
{
    "id":"e351185f-1ed8-437a-91bf-da2174e611e2",
    "user":"da_music",
    "batchType":"SPARK",
    "name":"SparkPi",
    "appStartTime":0,
    "appId":"application_1694730881181_58306",
    "appUrl":"http://xxxx-rm-2.xxxx:8088/cluster/app/application_1694730881181_58306",
    "appState":"KILLED",
    "appDiagnostic":"Application application_1694730881181_58306 was killed by user yarn at 10.49.59.149",
    "kyuubiInstance":"kyuubi-1.kyuubi-headless.spark.svc.cluster.local:10099",
    "state":"CANCELED",
    "createTime":1695102138188,
    "endTime":1695102163341,
    "batchInfo":{}
}
```

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No.

Closes #5306 from pan3793/kill-proxy-user.

Closes #5306

2b2e54307 [Cheng Pan] address comments
e7e9a9c57 [Cheng Pan] nit
9cf2afc61 [Cheng Pan] polish
ff82d1230 [Cheng Pan] polish
bf0057b41 [Cheng Pan] ApplicationManager supports proxy user

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2023-09-20 11:34:13 +08:00
parent 18d043fcbd
commit cd325b48ae
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
11 changed files with 195 additions and 93 deletions

View File

@ -460,6 +460,13 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.spnego.keytab | &lt;undefined&gt; | Keytab file for SPNego principal | string | 1.6.0 |
| kyuubi.spnego.principal | &lt;undefined&gt; | SPNego service principal, typical value would look like HTTP/_HOST@EXAMPLE.COM. SPNego service principal would be used when restful Kerberos security is enabled. This needs to be set only if SPNEGO is to be used in authentication. | string | 1.6.0 |
### Yarn
| Key | Default | Meaning | Type | Since |
|---------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|-------|
| kyuubi.yarn.user.admin | yarn | When kyuubi.yarn.user.strategy is set to ADMIN, use this admin user to construct YARN client for application management, e.g. kill application. | string | 1.8.0 |
| kyuubi.yarn.user.strategy | NONE | Determine which user to use to construct YARN client for application management, e.g. kill application. Options: <ul><li>NONE: use Kyuubi server user.</li><li>ADMIN: use admin user configured in `kyuubi.yarn.user.admin`.</li><li>OWNER: use session user, typically is application owner.</li></ul> | string | 1.8.0 |
### Zookeeper
| Key | Default | Meaning | Type | Since |

View File

@ -21,6 +21,7 @@ import java.io._
import java.net.{Inet4Address, InetAddress, NetworkInterface}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.security.PrivilegedAction
import java.text.SimpleDateFormat
import java.util.{Date, Properties, TimeZone, UUID}
import java.util.concurrent.TimeUnit
@ -203,6 +204,14 @@ object Utils extends Logging {
def currentUser: String = UserGroupInformation.getCurrentUser.getShortUserName
def doAs[T](
proxyUser: String,
realUser: UserGroupInformation = UserGroupInformation.getCurrentUser)(f: () => T): T = {
UserGroupInformation.createProxyUser(proxyUser, realUser).doAs(new PrivilegedAction[T] {
override def run(): T = f()
})
}
private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r
/**

View File

@ -2826,6 +2826,32 @@ object KyuubiConf {
.version("1.7.2")
.fallbackConf(ENGINE_SUBMIT_TIMEOUT)
object YarnUserStrategy extends Enumeration {
type YarnUserStrategy = Value
val NONE, ADMIN, OWNER = Value
}
val YARN_USER_STRATEGY: ConfigEntry[String] =
buildConf("kyuubi.yarn.user.strategy")
.doc("Determine which user to use to construct YARN client for application management, " +
"e.g. kill application. Options: <ul>" +
"<li>NONE: use Kyuubi server user.</li>" +
"<li>ADMIN: use admin user configured in `kyuubi.yarn.user.admin`.</li>" +
"<li>OWNER: use session user, typically is application owner.</li>" +
"</ul>")
.version("1.8.0")
.stringConf
.checkValues(YarnUserStrategy)
.createWithDefault("NONE")
val YARN_USER_ADMIN: ConfigEntry[String] =
buildConf("kyuubi.yarn.user.admin")
.doc(s"When ${YARN_USER_STRATEGY.key} is set to ADMIN, use this admin user to " +
"construct YARN client for application management, e.g. kill application.")
.version("1.8.0")
.stringConf
.createWithDefault("yarn")
/**
* Holds information about keys that have been deprecated.
*

View File

@ -47,11 +47,18 @@ trait ApplicationOperation {
* For example,
* if the Hadoop Yarn is used, for spark applications,
* the tag will be preset via spark.yarn.tags
* @param proxyUser the proxy user to use for executing kill commands.
* For secured YARN cluster, the Kyuubi Server's user typically
* has no permission to kill the application. Admin user or
* application owner should be used instead.
* @return a message contains response describing how the kill process.
*
* @note For implementations, please suppress exceptions and always return KillResponse
*/
def killApplicationByTag(appMgrInfo: ApplicationManagerInfo, tag: String): KillResponse
def killApplicationByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None): KillResponse
/**
* Get the engine/application status by the unique application tag
@ -59,11 +66,16 @@ trait ApplicationOperation {
* @param appMgrInfo the application manager information
* @param tag the unique application tag for engine instance.
* @param submitTime engine submit to resourceManager time
* @param proxyUser the proxy user to use for creating YARN client
* For secured YARN cluster, the Kyuubi Server's user may have no permission
* to operate the application. Admin user or application owner could be used
* instead.
* @return [[ApplicationInfo]]
*/
def getApplicationInfoByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None,
submitTime: Option[Long] = None): ApplicationInfo
}

View File

@ -233,7 +233,8 @@ private[kyuubi] class EngineRef(
}
if (started + timeout <= System.currentTimeMillis()) {
val killMessage = engineManager.killApplication(builder.appMgrInfo(), engineRefId)
val killMessage =
engineManager.killApplication(builder.appMgrInfo(), engineRefId, Some(appUser))
builder.close(true)
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
throw KyuubiSQLException(
@ -254,6 +255,7 @@ private[kyuubi] class EngineRef(
val applicationInfo = engineMgr.getApplicationInfo(
builder.appMgrInfo(),
engineRefId,
Some(appUser),
Some(started))
applicationInfo.foreach { appInfo =>
@ -310,7 +312,7 @@ private[kyuubi] class EngineRef(
try {
val appMgrInfo = builder.appMgrInfo()
builder.close(true)
engineManager.killApplication(appMgrInfo, engineRefId)
engineManager.killApplication(appMgrInfo, engineRefId, Some(appUser))
} catch {
case e: Exception =>
warn(s"Error closing engine builder, engineRefId: $engineRefId", e)

View File

@ -83,14 +83,16 @@ class JpsApplicationOperation extends ApplicationOperation {
override def killApplicationByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String): KillResponse = {
tag: String,
proxyUser: Option[String] = None): KillResponse = {
killJpsApplicationByTag(tag, true)
}
override def getApplicationInfoByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
submitTime: Option[Long]): ApplicationInfo = {
proxyUser: Option[String] = None,
submitTime: Option[Long] = None): ApplicationInfo = {
val commandOption = getEngine(tag)
if (commandOption.nonEmpty) {
val idAndCmd = commandOption.get

View File

@ -117,7 +117,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
override def killApplicationByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String): KillResponse = {
tag: String,
proxyUser: Option[String] = None): KillResponse = {
if (kyuubiConf == null) {
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
}
@ -157,7 +158,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
override def getApplicationInfoByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
submitTime: Option[Long]): ApplicationInfo = {
proxyUser: Option[String] = None,
submitTime: Option[Long] = None): ApplicationInfo = {
if (kyuubiConf == null) {
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
}

View File

@ -60,11 +60,14 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
super.stop()
}
def killApplication(appMgrInfo: ApplicationManagerInfo, tag: String): KillResponse = {
def killApplication(
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None): KillResponse = {
var (killed, lastMessage): KillResponse = (false, null)
for (operation <- operations if !killed) {
if (operation.isSupported(appMgrInfo)) {
val (k, m) = operation.killApplicationByTag(appMgrInfo, tag)
val (k, m) = operation.killApplicationByTag(appMgrInfo, tag, proxyUser)
killed = k
lastMessage = m
}
@ -83,10 +86,11 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
def getApplicationInfo(
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None,
submitTime: Option[Long] = None): Option[ApplicationInfo] = {
val operation = operations.find(_.isSupported(appMgrInfo))
operation match {
case Some(op) => Some(op.getApplicationInfoByTag(appMgrInfo, tag, submitTime))
case Some(op) => Some(op.getApplicationInfoByTag(appMgrInfo, tag, proxyUser, submitTime))
case None => None
}
}

View File

@ -21,11 +21,14 @@ import java.util.Locale
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, YarnApplicationState}
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.kyuubi.Logging
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.YarnUserStrategy
import org.apache.kyuubi.config.KyuubiConf.YarnUserStrategy._
import org.apache.kyuubi.engine.ApplicationOperation._
import org.apache.kyuubi.engine.ApplicationState.ApplicationState
import org.apache.kyuubi.engine.YarnApplicationOperation.toApplicationState
@ -33,105 +36,135 @@ import org.apache.kyuubi.util.KyuubiHadoopUtils
class YarnApplicationOperation extends ApplicationOperation with Logging {
@volatile private var yarnClient: YarnClient = _
private var yarnConf: Configuration = _
@volatile private var adminYarnClient: Option[YarnClient] = None
private var submitTimeout: Long = _
override def initialize(conf: KyuubiConf): Unit = {
submitTimeout = conf.get(KyuubiConf.ENGINE_YARN_SUBMIT_TIMEOUT)
val yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)
// YarnClient is thread-safe
val c = YarnClient.createYarnClient()
c.init(yarnConf)
c.start()
yarnClient = c
info(s"Successfully initialized yarn client: ${c.getServiceState}")
yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)
def createYarnClientWithCurrentUser(): Unit = {
val c = createYarnClient(yarnConf)
info(s"Creating admin YARN client with current user: ${Utils.currentUser}.")
adminYarnClient = Some(c)
}
def createYarnClientWithProxyUser(proxyUser: String): Unit = Utils.doAs(proxyUser) { () =>
val c = createYarnClient(yarnConf)
info(s"Creating admin YARN client with proxy user: $proxyUser.")
adminYarnClient = Some(c)
}
YarnUserStrategy.withName(conf.get(KyuubiConf.YARN_USER_STRATEGY)) match {
case NONE =>
createYarnClientWithCurrentUser()
case ADMIN if conf.get(KyuubiConf.YARN_USER_ADMIN) == Utils.currentUser =>
createYarnClientWithCurrentUser()
case ADMIN =>
createYarnClientWithProxyUser(conf.get(KyuubiConf.YARN_USER_ADMIN))
case OWNER =>
info("Skip initializing admin YARN client")
}
}
override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
yarnClient != null && appMgrInfo.resourceManager.exists(
_.toLowerCase(Locale.ROOT).startsWith("yarn"))
private def createYarnClient(_yarnConf: Configuration): YarnClient = {
// YarnClient is thread-safe
val yarnClient = YarnClient.createYarnClient()
yarnClient.init(_yarnConf)
yarnClient.start()
yarnClient
}
private def withYarnClient[T](proxyUser: Option[String])(action: YarnClient => T): T = {
(adminYarnClient, proxyUser) match {
case (Some(yarnClient), _) =>
action(yarnClient)
case (None, Some(user)) =>
Utils.doAs(user) { () =>
var yarnClient: YarnClient = null
try {
yarnClient = createYarnClient(yarnConf)
action(yarnClient)
} finally {
Utils.tryLogNonFatalError(yarnClient.close())
}
}
case (None, None) =>
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
}
}
override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean =
appMgrInfo.resourceManager.exists(_.toLowerCase(Locale.ROOT).startsWith("yarn"))
override def killApplicationByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String): KillResponse = {
if (yarnClient != null) {
try {
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
if (reports.isEmpty) {
(false, NOT_FOUND)
} else {
try {
val applicationId = reports.get(0).getApplicationId
yarnClient.killApplication(applicationId)
(true, s"Succeeded to terminate: $applicationId with $tag")
} catch {
case e: Exception =>
(false, s"Failed to terminate application with $tag, due to ${e.getMessage}")
}
tag: String,
proxyUser: Option[String] = None): KillResponse = withYarnClient(proxyUser) { yarnClient =>
try {
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
if (reports.isEmpty) {
(false, NOT_FOUND)
} else {
try {
val applicationId = reports.get(0).getApplicationId
yarnClient.killApplication(applicationId)
(true, s"Succeeded to terminate: $applicationId with $tag")
} catch {
case e: Exception =>
(false, s"Failed to terminate application with $tag, due to ${e.getMessage}")
}
} catch {
case e: Exception =>
(
false,
s"Failed to get while terminating application with tag $tag," +
s" due to ${e.getMessage}")
}
} else {
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
} catch {
case e: Exception =>
(
false,
s"Failed to get while terminating application with tag $tag, due to ${e.getMessage}")
}
}
override def getApplicationInfoByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
submitTime: Option[Long]): ApplicationInfo = {
if (yarnClient != null) {
debug(s"Getting application info from Yarn cluster by $tag tag")
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
if (reports.isEmpty) {
debug(s"Application with tag $tag not found")
submitTime match {
case Some(_submitTime) =>
val elapsedTime = System.currentTimeMillis - _submitTime
if (elapsedTime > submitTimeout) {
error(s"Can't find target yarn application by tag: $tag, " +
s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
ApplicationInfo.NOT_FOUND
} else {
warn("Wait for yarn application to be submitted, " +
s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
ApplicationInfo.UNKNOWN
}
case _ => ApplicationInfo.NOT_FOUND
}
} else {
val report = reports.get(0)
val info = ApplicationInfo(
id = report.getApplicationId.toString,
name = report.getName,
state = toApplicationState(
report.getApplicationId.toString,
report.getYarnApplicationState,
report.getFinalApplicationStatus),
url = Option(report.getTrackingUrl),
error = Option(report.getDiagnostics))
debug(s"Successfully got application info by $tag: $info")
info
proxyUser: Option[String] = None,
submitTime: Option[Long] = None): ApplicationInfo = withYarnClient(proxyUser) { yarnClient =>
debug(s"Getting application info from Yarn cluster by $tag tag")
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
if (reports.isEmpty) {
debug(s"Application with tag $tag not found")
submitTime match {
case Some(_submitTime) =>
val elapsedTime = System.currentTimeMillis - _submitTime
if (elapsedTime > submitTimeout) {
error(s"Can't find target yarn application by tag: $tag, " +
s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
ApplicationInfo.NOT_FOUND
} else {
warn("Wait for yarn application to be submitted, " +
s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
ApplicationInfo.UNKNOWN
}
case _ => ApplicationInfo.NOT_FOUND
}
} else {
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
val report = reports.get(0)
val info = ApplicationInfo(
id = report.getApplicationId.toString,
name = report.getName,
state = toApplicationState(
report.getApplicationId.toString,
report.getYarnApplicationState,
report.getFinalApplicationStatus),
url = Option(report.getTrackingUrl),
error = Option(report.getDiagnostics))
debug(s"Successfully got application info by $tag: $info")
info
}
}
override def stop(): Unit = {
if (yarnClient != null) {
try {
yarnClient.stop()
} catch {
case e: Exception => error(e.getMessage)
}
}
override def stop(): Unit = adminYarnClient.foreach { yarnClient =>
Utils.tryLogNonFatalError(yarnClient.stop())
}
}

View File

@ -110,6 +110,7 @@ class BatchJobSubmission(
applicationManager.getApplicationInfo(
builder.appMgrInfo(),
batchId,
Some(session.user),
Some(_submitTime))
applicationId(applicationInfo).foreach { _ =>
if (_appStartTime <= 0) {
@ -124,7 +125,7 @@ class BatchJobSubmission(
}
private[kyuubi] def killBatchApplication(): KillResponse = {
applicationManager.killApplication(builder.appMgrInfo(), batchId)
applicationManager.killApplication(builder.appMgrInfo(), batchId, Some(session.user))
}
private val applicationCheckInterval =

View File

@ -323,6 +323,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
val batchAppStatus = sessionManager.applicationManager.getApplicationInfo(
metadata.appMgrInfo,
batchId,
Some(userName),
// prevent that the batch be marked as terminated if application state is NOT_FOUND
Some(metadata.engineOpenTime).filter(_ > 0).orElse(Some(System.currentTimeMillis)))
buildBatch(metadata, batchAppStatus)
@ -452,9 +453,12 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
}
}
def forceKill(appMgrInfo: ApplicationManagerInfo, batchId: String): KillResponse = {
def forceKill(
appMgrInfo: ApplicationManagerInfo,
batchId: String,
user: String): KillResponse = {
val (killed, message) = sessionManager.applicationManager
.killApplication(appMgrInfo, batchId)
.killApplication(appMgrInfo, batchId, Some(user))
info(s"Mark batch[$batchId] closed by ${fe.connectionUrl}")
sessionManager.updateMetadata(Metadata(identifier = batchId, peerInstanceClosed = true))
(killed, message)
@ -480,7 +484,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
new CloseBatchResponse(false, s"The batch[$metadata] has been terminated.")
} else {
info(s"Cancel batch[$batchId] with state ${metadata.state} by killing application")
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId)
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, userName)
new CloseBatchResponse(killed, msg)
}
} else if (metadata.kyuubiInstance != fe.connectionUrl) {
@ -491,12 +495,12 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
} catch {
case e: KyuubiRestException =>
error(s"Error redirecting delete batch[$batchId] to ${metadata.kyuubiInstance}", e)
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId)
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, userName)
new CloseBatchResponse(killed, if (killed) msg else Utils.stringifyException(e))
}
} else { // should not happen, but handle this for safe
warn(s"Something wrong on deleting batch[$batchId], try forcibly killing application")
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId)
val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, userName)
new CloseBatchResponse(killed, msg)
}
}.getOrElse {