diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index add63544d..832099764 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -460,6 +460,13 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.spnego.keytab | <undefined> | Keytab file for SPNego principal | string | 1.6.0 | | kyuubi.spnego.principal | <undefined> | SPNego service principal, typical value would look like HTTP/_HOST@EXAMPLE.COM. SPNego service principal would be used when restful Kerberos security is enabled. This needs to be set only if SPNEGO is to be used in authentication. | string | 1.6.0 | +### Yarn + +| Key | Default | Meaning | Type | Since | +|---------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|-------| +| kyuubi.yarn.user.admin | yarn | When kyuubi.yarn.user.strategy is set to ADMIN, use this admin user to construct YARN client for application management, e.g. kill application. | string | 1.8.0 | +| kyuubi.yarn.user.strategy | NONE | Determine which user to use to construct YARN client for application management, e.g. kill application. Options: | string | 1.8.0 | + ### Zookeeper | Key | Default | Meaning | Type | Since | diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala index fac30a173..accfca4c9 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala @@ -21,6 +21,7 @@ import java.io._ import java.net.{Inet4Address, InetAddress, NetworkInterface} import java.nio.charset.StandardCharsets import java.nio.file.{Files, Path, Paths, StandardCopyOption} +import java.security.PrivilegedAction import java.text.SimpleDateFormat import java.util.{Date, Properties, TimeZone, UUID} import java.util.concurrent.TimeUnit @@ -203,6 +204,14 @@ object Utils extends Logging { def currentUser: String = UserGroupInformation.getCurrentUser.getShortUserName + def doAs[T]( + proxyUser: String, + realUser: UserGroupInformation = UserGroupInformation.getCurrentUser)(f: () => T): T = { + UserGroupInformation.createProxyUser(proxyUser, realUser).doAs(new PrivilegedAction[T] { + override def run(): T = f() + }) + } + private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r /** diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 61d97da9e..50006b95e 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -2826,6 +2826,32 @@ object KyuubiConf { .version("1.7.2") .fallbackConf(ENGINE_SUBMIT_TIMEOUT) + object YarnUserStrategy extends Enumeration { + type YarnUserStrategy = Value + val NONE, ADMIN, OWNER = Value + } + + val YARN_USER_STRATEGY: ConfigEntry[String] = + buildConf("kyuubi.yarn.user.strategy") + .doc("Determine which user to use to construct YARN client for application management, " + + "e.g. kill application. Options: ") + .version("1.8.0") + .stringConf + .checkValues(YarnUserStrategy) + .createWithDefault("NONE") + + val YARN_USER_ADMIN: ConfigEntry[String] = + buildConf("kyuubi.yarn.user.admin") + .doc(s"When ${YARN_USER_STRATEGY.key} is set to ADMIN, use this admin user to " + + "construct YARN client for application management, e.g. kill application.") + .version("1.8.0") + .stringConf + .createWithDefault("yarn") + /** * Holds information about keys that have been deprecated. * diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala index 2acce39cc..23a49c1ae 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala @@ -47,11 +47,18 @@ trait ApplicationOperation { * For example, * if the Hadoop Yarn is used, for spark applications, * the tag will be preset via spark.yarn.tags + * @param proxyUser the proxy user to use for executing kill commands. + * For secured YARN cluster, the Kyuubi Server's user typically + * has no permission to kill the application. Admin user or + * application owner should be used instead. * @return a message contains response describing how the kill process. * * @note For implementations, please suppress exceptions and always return KillResponse */ - def killApplicationByTag(appMgrInfo: ApplicationManagerInfo, tag: String): KillResponse + def killApplicationByTag( + appMgrInfo: ApplicationManagerInfo, + tag: String, + proxyUser: Option[String] = None): KillResponse /** * Get the engine/application status by the unique application tag @@ -59,11 +66,16 @@ trait ApplicationOperation { * @param appMgrInfo the application manager information * @param tag the unique application tag for engine instance. * @param submitTime engine submit to resourceManager time + * @param proxyUser the proxy user to use for creating YARN client + * For secured YARN cluster, the Kyuubi Server's user may have no permission + * to operate the application. Admin user or application owner could be used + * instead. * @return [[ApplicationInfo]] */ def getApplicationInfoByTag( appMgrInfo: ApplicationManagerInfo, tag: String, + proxyUser: Option[String] = None, submitTime: Option[Long] = None): ApplicationInfo } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index 160e7f39e..6122a6f13 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -233,7 +233,8 @@ private[kyuubi] class EngineRef( } if (started + timeout <= System.currentTimeMillis()) { - val killMessage = engineManager.killApplication(builder.appMgrInfo(), engineRefId) + val killMessage = + engineManager.killApplication(builder.appMgrInfo(), engineRefId, Some(appUser)) builder.close(true) MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser))) throw KyuubiSQLException( @@ -254,6 +255,7 @@ private[kyuubi] class EngineRef( val applicationInfo = engineMgr.getApplicationInfo( builder.appMgrInfo(), engineRefId, + Some(appUser), Some(started)) applicationInfo.foreach { appInfo => @@ -310,7 +312,7 @@ private[kyuubi] class EngineRef( try { val appMgrInfo = builder.appMgrInfo() builder.close(true) - engineManager.killApplication(appMgrInfo, engineRefId) + engineManager.killApplication(appMgrInfo, engineRefId, Some(appUser)) } catch { case e: Exception => warn(s"Error closing engine builder, engineRefId: $engineRefId", e) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala index 64dacbb64..1d0d58d16 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala @@ -83,14 +83,16 @@ class JpsApplicationOperation extends ApplicationOperation { override def killApplicationByTag( appMgrInfo: ApplicationManagerInfo, - tag: String): KillResponse = { + tag: String, + proxyUser: Option[String] = None): KillResponse = { killJpsApplicationByTag(tag, true) } override def getApplicationInfoByTag( appMgrInfo: ApplicationManagerInfo, tag: String, - submitTime: Option[Long]): ApplicationInfo = { + proxyUser: Option[String] = None, + submitTime: Option[Long] = None): ApplicationInfo = { val commandOption = getEngine(tag) if (commandOption.nonEmpty) { val idAndCmd = commandOption.get diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index c9d509efe..16a0c29d1 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -117,7 +117,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { override def killApplicationByTag( appMgrInfo: ApplicationManagerInfo, - tag: String): KillResponse = { + tag: String, + proxyUser: Option[String] = None): KillResponse = { if (kyuubiConf == null) { throw new IllegalStateException("Methods initialize and isSupported must be called ahead") } @@ -157,7 +158,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { override def getApplicationInfoByTag( appMgrInfo: ApplicationManagerInfo, tag: String, - submitTime: Option[Long]): ApplicationInfo = { + proxyUser: Option[String] = None, + submitTime: Option[Long] = None): ApplicationInfo = { if (kyuubiConf == null) { throw new IllegalStateException("Methods initialize and isSupported must be called ahead") } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala index 4e121d297..f8b640053 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala @@ -60,11 +60,14 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager super.stop() } - def killApplication(appMgrInfo: ApplicationManagerInfo, tag: String): KillResponse = { + def killApplication( + appMgrInfo: ApplicationManagerInfo, + tag: String, + proxyUser: Option[String] = None): KillResponse = { var (killed, lastMessage): KillResponse = (false, null) for (operation <- operations if !killed) { if (operation.isSupported(appMgrInfo)) { - val (k, m) = operation.killApplicationByTag(appMgrInfo, tag) + val (k, m) = operation.killApplicationByTag(appMgrInfo, tag, proxyUser) killed = k lastMessage = m } @@ -83,10 +86,11 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager def getApplicationInfo( appMgrInfo: ApplicationManagerInfo, tag: String, + proxyUser: Option[String] = None, submitTime: Option[Long] = None): Option[ApplicationInfo] = { val operation = operations.find(_.isSupported(appMgrInfo)) operation match { - case Some(op) => Some(op.getApplicationInfoByTag(appMgrInfo, tag, submitTime)) + case Some(op) => Some(op.getApplicationInfoByTag(appMgrInfo, tag, proxyUser, submitTime)) case None => None } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala index d87fc406a..1f672ad70 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala @@ -21,11 +21,14 @@ import java.util.Locale import scala.collection.JavaConverters._ +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, YarnApplicationState} import org.apache.hadoop.yarn.client.api.YarnClient -import org.apache.kyuubi.Logging +import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.YarnUserStrategy +import org.apache.kyuubi.config.KyuubiConf.YarnUserStrategy._ import org.apache.kyuubi.engine.ApplicationOperation._ import org.apache.kyuubi.engine.ApplicationState.ApplicationState import org.apache.kyuubi.engine.YarnApplicationOperation.toApplicationState @@ -33,105 +36,135 @@ import org.apache.kyuubi.util.KyuubiHadoopUtils class YarnApplicationOperation extends ApplicationOperation with Logging { - @volatile private var yarnClient: YarnClient = _ + private var yarnConf: Configuration = _ + @volatile private var adminYarnClient: Option[YarnClient] = None private var submitTimeout: Long = _ override def initialize(conf: KyuubiConf): Unit = { submitTimeout = conf.get(KyuubiConf.ENGINE_YARN_SUBMIT_TIMEOUT) - val yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf) - // YarnClient is thread-safe - val c = YarnClient.createYarnClient() - c.init(yarnConf) - c.start() - yarnClient = c - info(s"Successfully initialized yarn client: ${c.getServiceState}") + yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf) + + def createYarnClientWithCurrentUser(): Unit = { + val c = createYarnClient(yarnConf) + info(s"Creating admin YARN client with current user: ${Utils.currentUser}.") + adminYarnClient = Some(c) + } + + def createYarnClientWithProxyUser(proxyUser: String): Unit = Utils.doAs(proxyUser) { () => + val c = createYarnClient(yarnConf) + info(s"Creating admin YARN client with proxy user: $proxyUser.") + adminYarnClient = Some(c) + } + + YarnUserStrategy.withName(conf.get(KyuubiConf.YARN_USER_STRATEGY)) match { + case NONE => + createYarnClientWithCurrentUser() + case ADMIN if conf.get(KyuubiConf.YARN_USER_ADMIN) == Utils.currentUser => + createYarnClientWithCurrentUser() + case ADMIN => + createYarnClientWithProxyUser(conf.get(KyuubiConf.YARN_USER_ADMIN)) + case OWNER => + info("Skip initializing admin YARN client") + } } - override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = { - yarnClient != null && appMgrInfo.resourceManager.exists( - _.toLowerCase(Locale.ROOT).startsWith("yarn")) + private def createYarnClient(_yarnConf: Configuration): YarnClient = { + // YarnClient is thread-safe + val yarnClient = YarnClient.createYarnClient() + yarnClient.init(_yarnConf) + yarnClient.start() + yarnClient } + private def withYarnClient[T](proxyUser: Option[String])(action: YarnClient => T): T = { + (adminYarnClient, proxyUser) match { + case (Some(yarnClient), _) => + action(yarnClient) + case (None, Some(user)) => + Utils.doAs(user) { () => + var yarnClient: YarnClient = null + try { + yarnClient = createYarnClient(yarnConf) + action(yarnClient) + } finally { + Utils.tryLogNonFatalError(yarnClient.close()) + } + } + case (None, None) => + throw new IllegalStateException("Methods initialize and isSupported must be called ahead") + } + } + + override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = + appMgrInfo.resourceManager.exists(_.toLowerCase(Locale.ROOT).startsWith("yarn")) + override def killApplicationByTag( appMgrInfo: ApplicationManagerInfo, - tag: String): KillResponse = { - if (yarnClient != null) { - try { - val reports = yarnClient.getApplications(null, null, Set(tag).asJava) - if (reports.isEmpty) { - (false, NOT_FOUND) - } else { - try { - val applicationId = reports.get(0).getApplicationId - yarnClient.killApplication(applicationId) - (true, s"Succeeded to terminate: $applicationId with $tag") - } catch { - case e: Exception => - (false, s"Failed to terminate application with $tag, due to ${e.getMessage}") - } + tag: String, + proxyUser: Option[String] = None): KillResponse = withYarnClient(proxyUser) { yarnClient => + try { + val reports = yarnClient.getApplications(null, null, Set(tag).asJava) + if (reports.isEmpty) { + (false, NOT_FOUND) + } else { + try { + val applicationId = reports.get(0).getApplicationId + yarnClient.killApplication(applicationId) + (true, s"Succeeded to terminate: $applicationId with $tag") + } catch { + case e: Exception => + (false, s"Failed to terminate application with $tag, due to ${e.getMessage}") } - } catch { - case e: Exception => - ( - false, - s"Failed to get while terminating application with tag $tag," + - s" due to ${e.getMessage}") } - } else { - throw new IllegalStateException("Methods initialize and isSupported must be called ahead") + } catch { + case e: Exception => + ( + false, + s"Failed to get while terminating application with tag $tag, due to ${e.getMessage}") } } override def getApplicationInfoByTag( appMgrInfo: ApplicationManagerInfo, tag: String, - submitTime: Option[Long]): ApplicationInfo = { - if (yarnClient != null) { - debug(s"Getting application info from Yarn cluster by $tag tag") - val reports = yarnClient.getApplications(null, null, Set(tag).asJava) - if (reports.isEmpty) { - debug(s"Application with tag $tag not found") - submitTime match { - case Some(_submitTime) => - val elapsedTime = System.currentTimeMillis - _submitTime - if (elapsedTime > submitTimeout) { - error(s"Can't find target yarn application by tag: $tag, " + - s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.") - ApplicationInfo.NOT_FOUND - } else { - warn("Wait for yarn application to be submitted, " + - s"elapsed time: ${elapsedTime}ms, return UNKNOWN status") - ApplicationInfo.UNKNOWN - } - case _ => ApplicationInfo.NOT_FOUND - } - } else { - val report = reports.get(0) - val info = ApplicationInfo( - id = report.getApplicationId.toString, - name = report.getName, - state = toApplicationState( - report.getApplicationId.toString, - report.getYarnApplicationState, - report.getFinalApplicationStatus), - url = Option(report.getTrackingUrl), - error = Option(report.getDiagnostics)) - debug(s"Successfully got application info by $tag: $info") - info + proxyUser: Option[String] = None, + submitTime: Option[Long] = None): ApplicationInfo = withYarnClient(proxyUser) { yarnClient => + debug(s"Getting application info from Yarn cluster by $tag tag") + val reports = yarnClient.getApplications(null, null, Set(tag).asJava) + if (reports.isEmpty) { + debug(s"Application with tag $tag not found") + submitTime match { + case Some(_submitTime) => + val elapsedTime = System.currentTimeMillis - _submitTime + if (elapsedTime > submitTimeout) { + error(s"Can't find target yarn application by tag: $tag, " + + s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.") + ApplicationInfo.NOT_FOUND + } else { + warn("Wait for yarn application to be submitted, " + + s"elapsed time: ${elapsedTime}ms, return UNKNOWN status") + ApplicationInfo.UNKNOWN + } + case _ => ApplicationInfo.NOT_FOUND } } else { - throw new IllegalStateException("Methods initialize and isSupported must be called ahead") + val report = reports.get(0) + val info = ApplicationInfo( + id = report.getApplicationId.toString, + name = report.getName, + state = toApplicationState( + report.getApplicationId.toString, + report.getYarnApplicationState, + report.getFinalApplicationStatus), + url = Option(report.getTrackingUrl), + error = Option(report.getDiagnostics)) + debug(s"Successfully got application info by $tag: $info") + info } } - override def stop(): Unit = { - if (yarnClient != null) { - try { - yarnClient.stop() - } catch { - case e: Exception => error(e.getMessage) - } - } + override def stop(): Unit = adminYarnClient.foreach { yarnClient => + Utils.tryLogNonFatalError(yarnClient.stop()) } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index 4ea609540..779dc48ae 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -110,6 +110,7 @@ class BatchJobSubmission( applicationManager.getApplicationInfo( builder.appMgrInfo(), batchId, + Some(session.user), Some(_submitTime)) applicationId(applicationInfo).foreach { _ => if (_appStartTime <= 0) { @@ -124,7 +125,7 @@ class BatchJobSubmission( } private[kyuubi] def killBatchApplication(): KillResponse = { - applicationManager.killApplication(builder.appMgrInfo(), batchId) + applicationManager.killApplication(builder.appMgrInfo(), batchId, Some(session.user)) } private val applicationCheckInterval = diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index a525ac4b2..76d913a98 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -323,6 +323,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { val batchAppStatus = sessionManager.applicationManager.getApplicationInfo( metadata.appMgrInfo, batchId, + Some(userName), // prevent that the batch be marked as terminated if application state is NOT_FOUND Some(metadata.engineOpenTime).filter(_ > 0).orElse(Some(System.currentTimeMillis))) buildBatch(metadata, batchAppStatus) @@ -452,9 +453,12 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { } } - def forceKill(appMgrInfo: ApplicationManagerInfo, batchId: String): KillResponse = { + def forceKill( + appMgrInfo: ApplicationManagerInfo, + batchId: String, + user: String): KillResponse = { val (killed, message) = sessionManager.applicationManager - .killApplication(appMgrInfo, batchId) + .killApplication(appMgrInfo, batchId, Some(user)) info(s"Mark batch[$batchId] closed by ${fe.connectionUrl}") sessionManager.updateMetadata(Metadata(identifier = batchId, peerInstanceClosed = true)) (killed, message) @@ -480,7 +484,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { new CloseBatchResponse(false, s"The batch[$metadata] has been terminated.") } else { info(s"Cancel batch[$batchId] with state ${metadata.state} by killing application") - val (killed, msg) = forceKill(metadata.appMgrInfo, batchId) + val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, userName) new CloseBatchResponse(killed, msg) } } else if (metadata.kyuubiInstance != fe.connectionUrl) { @@ -491,12 +495,12 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { } catch { case e: KyuubiRestException => error(s"Error redirecting delete batch[$batchId] to ${metadata.kyuubiInstance}", e) - val (killed, msg) = forceKill(metadata.appMgrInfo, batchId) + val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, userName) new CloseBatchResponse(killed, if (killed) msg else Utils.stringifyException(e)) } } else { // should not happen, but handle this for safe warn(s"Something wrong on deleting batch[$batchId], try forcibly killing application") - val (killed, msg) = forceKill(metadata.appMgrInfo, batchId) + val (killed, msg) = forceKill(metadata.appMgrInfo, batchId, userName) new CloseBatchResponse(killed, msg) } }.getOrElse {