[KYUUBI #6008] RESTful API supports killing engine forcibly

# 🔍 Description
## Issue References 🔗

## Describe Your Solution 🔧

I'd like to introduce the feature that allows users to forcibly kill an engine through API.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6008 from zhaohehuhu/dev-0123.

Closes #6008

00c208a26 [Cheng Pan] fix
8721a2d2a [Cheng Pan] log
efc7587f7 [Cheng Pan] client
cd5129db3 [Cheng Pan] fix ut
5e1b6a161 [Cheng Pan] Update kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
72d7df357 [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
6d5d08710 [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
b013194d1 [zhaohehuhu] move the position of log
0cdeede7a [zhaohehuhu] restore ENGINE_SPARK_REGISTER_ATTRIBUTES
f826d0515 [zhaohehuhu] reformat
a13466e37 [zhaohehuhu] update doc and log string encoded
3a2f5970a [zhaohehuhu] refactor
ae24ea74d [zhaohehuhu] refactor UT
936a54e27 [Wang, Fei] register app mgr info
9bacc2c8b [hezhao2] fix UTs
11106d75b [Wang, Fei] comments
ba57c2c3f [hezhao2] refactor code to delete the node and then kill application
634ceb677 [hezhao2] reformat
ab31382ee [hezhao2] reformat
513bcdc57 [hezhao2] fix UT
506220654 [hezhao2] get refId by user, sharelevel and subdomain
3ad9577df [hezhao2] rename params to support multiple engines
632c56b88 [hezhao2] fix unused import
bd7bb45f0 [hezhao2] refactor
fb9b25176 [hezhao2] add default value for forceKill param
070aad06f [hezhao2] refactor
51827ecde [hezhao2] fix UT
f11e7657e [hezhao2] add an UT
8a65cf113 [hezhao2] refactor code
d6f82ff9a [hezhao2] refactor code
f3ab9c546 [hezhao2] new parameter added to decide whether to kill forcefully handle the result of killApplication
5faa5b54f [hezhao2] kill engine forcibly

Lead-authored-by: hezhao2 <hezhao2@cisco.com>
Co-authored-by: zhaohehuhu <luoyedeyi459@163.com>
Co-authored-by: Cheng Pan <chengpan@apache.org>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
hezhao2 2024-07-01 15:28:09 +08:00 committed by Cheng Pan
parent 693d8a22f8
commit ab273c8ba3
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
12 changed files with 148 additions and 23 deletions

View File

@ -457,13 +457,14 @@ Delete the specified engine.
#### Request Parameters
| Name | Description | Type |
|:------------------------|:------------------------------|:-----------------|
| type | the engine type | String(optional) |
| sharelevel | the engine share level | String(optional) |
| subdomain | the engine subdomain | String(optional) |
| proxyUser | the proxy user to impersonate | String(optional) |
| hive.server2.proxy.user | the proxy user to impersonate | String(optional) |
| Name | Description | Type |
|:------------------------|:-------------------------------------------------------------|:------------------|
| type | the engine type | String(optional) |
| sharelevel | the engine share level | String(optional) |
| subdomain | the engine subdomain | String(optional) |
| proxyUser | the proxy user to impersonate | String(optional) |
| hive.server2.proxy.user | the proxy user to impersonate | String(optional) |
| kill | whether to kill the engine forcibly. Default value is false. | Boolean(optional) |
`proxyUser` is an alternative to `hive.server2.proxy.user`, and the current behavior is consistent with
`hive.server2.proxy.user`. When both parameters are set, `proxyUser` takes precedence.

View File

@ -100,7 +100,8 @@ class SparkTBinaryFrontendService(
val extraAttributes = conf.get(KyuubiConf.ENGINE_SPARK_REGISTER_ATTRIBUTES).map { attr =>
attr -> KyuubiSparkUtil.globalSparkContext.getConf.get(attr, "")
}.toMap
val attributes = extraAttributes ++ Map(KYUUBI_ENGINE_ID -> KyuubiSparkUtil.engineId)
val attributes =
super.attributes ++ extraAttributes ++ Map(KYUUBI_ENGINE_ID -> KyuubiSparkUtil.engineId)
// TODO Support Spark Web UI Enabled SSL
sc.uiWebUrl match {
case Some(url) => attributes ++ Map(KYUUBI_ENGINE_URL -> url.split("//").last)

View File

@ -36,6 +36,7 @@ object KyuubiReservedKeys {
final val KYUUBI_ENGINE_URL = "kyuubi.engine.url"
final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials"
final val KYUUBI_ENGINE_APP_MGR_INFO_KEY = "kyuubi.engine.appMgrInfo"
final val KYUUBI_SESSION_HANDLE_KEY = "kyuubi.session.handle"
final val KYUUBI_SESSION_ALIVE_PROBE = "kyuubi.session.alive.probe"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID =

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.service
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.service.ServiceState.LATENT
/**
@ -40,4 +40,8 @@ abstract class AbstractFrontendService(name: String)
discoveryService.foreach(addService)
super.initialize(conf)
}
override def attributes: Map[String, String] = {
conf.getAll.filter(_._1 == KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO_KEY)
}
}

View File

@ -23,8 +23,12 @@ import org.apache.kyuubi.client.api.v1.dto.Engine;
import org.apache.kyuubi.client.api.v1.dto.OperationData;
import org.apache.kyuubi.client.api.v1.dto.ServerData;
import org.apache.kyuubi.client.api.v1.dto.SessionData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AdminRestApi {
private static final Logger LOG = LoggerFactory.getLogger(AdminRestApi.class);
private KyuubiRestClient client;
private static final String API_BASE_PATH = "admin";
@ -65,13 +69,25 @@ public class AdminRestApi {
return this.getClient().post(path, null, client.getAuthHeader());
}
/** This method is deprecated since 1.10 */
@Deprecated
public String deleteEngine(
String engineType, String shareLevel, String subdomain, String hs2ProxyUser) {
LOG.warn(
"The method `deleteEngine(engineType, shareLevel, subdomain, hs2ProxyUser)` "
+ "is deprecated since 1.10.0, using "
+ "`deleteEngine(engineType, shareLevel, subdomain, hs2ProxyUser, kill)` instead.");
return this.deleteEngine(engineType, shareLevel, subdomain, hs2ProxyUser, false);
}
public String deleteEngine(
String engineType, String shareLevel, String subdomain, String hs2ProxyUser, boolean kill) {
Map<String, Object> params = new HashMap<>();
params.put("type", engineType);
params.put("sharelevel", shareLevel);
params.put("subdomain", subdomain);
params.put("hive.server2.proxy.user", hs2ProxyUser);
params.put("kill", kill);
return this.getClient().delete(API_BASE_PATH + "/engine", params, client.getAuthHeader());
}

View File

@ -17,6 +17,13 @@
package org.apache.kyuubi.engine
import java.nio.charset.StandardCharsets
import java.util.Base64
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationState.ApplicationState
@ -132,8 +139,11 @@ case class ApplicationManagerInfo(
resourceManager: Option[String],
kubernetesInfo: KubernetesInfo = KubernetesInfo())
object ApplicationManagerInfo {
object ApplicationManagerInfo extends Logging {
final val DEFAULT_KUBERNETES_NAMESPACE = "default"
val mapper: ObjectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.registerModule(DefaultScalaModule)
def apply(
resourceManager: Option[String],
@ -143,4 +153,22 @@ object ApplicationManagerInfo {
resourceManager,
KubernetesInfo(kubernetesContext, kubernetesNamespace))
}
def serialize(appMgrInfo: ApplicationManagerInfo): String = {
Base64.getEncoder.encodeToString(
mapper.writeValueAsString(appMgrInfo).getBytes(StandardCharsets.UTF_8))
}
def deserialize(encodedStr: String): ApplicationManagerInfo = {
try {
val json = new String(
Base64.getDecoder.decode(encodedStr.getBytes),
StandardCharsets.UTF_8)
mapper.readValue(json, classOf[ApplicationManagerInfo])
} catch {
case _: Throwable =>
error(s"Fail to deserialize the encoded string: $encodedStr")
ApplicationManagerInfo(None)
}
}
}

View File

@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.StringUtils.containsIgnoreCase
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.KYUUBI_HOME
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.{JavaUtils, NamedThreadFactory}
@ -169,6 +169,11 @@ trait ProcBuilder {
@volatile private[kyuubi] var process: Process = _
@volatile private[kyuubi] var processLaunched: Boolean = false
// Set engine application manger info conf
conf.set(
KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO_KEY,
ApplicationManagerInfo.serialize(appMgrInfo()))
private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
val currentTime = System.currentTimeMillis()

View File

@ -31,15 +31,16 @@ import org.apache.commons.lang3.StringUtils
import org.apache.kyuubi.{KYUUBI_VERSION, Logging}
import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.ApplicationManagerInfo
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
import org.apache.kyuubi.ha.client.{DiscoveryPaths, ServiceNodeInfo}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle}
import org.apache.kyuubi.server.KyuubiServer
import org.apache.kyuubi.server.api.{ApiRequestContext, ApiUtils}
import org.apache.kyuubi.session.{KyuubiSession, SessionHandle}
import org.apache.kyuubi.session.{KyuubiSession, KyuubiSessionManager, SessionHandle}
@Tag(name = "Admin")
@Produces(Array(MediaType.APPLICATION_JSON))
@ -277,7 +278,8 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
@QueryParam("sharelevel") shareLevel: String,
@QueryParam("subdomain") subdomain: String,
@QueryParam("proxyUser") kyuubiProxyUser: String,
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Response = {
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String,
@QueryParam("kill") @DefaultValue("false") kill: Boolean): Response = {
val activeProxyUser = Option(kyuubiProxyUser).getOrElse(hs2ProxyUser)
val userName = if (fe.isAdministrator(fe.getRealUser())) {
Option(activeProxyUser).getOrElse(fe.getRealUser())
@ -286,24 +288,38 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
}
val engine = normalizeEngineInfo(userName, engineType, shareLevel, subdomain, "default")
val engineSpace = calculateEngineSpace(engine)
val responseMsgBuilder = new StringBuilder()
withDiscoveryClient(fe.getConf) { discoveryClient =>
val engineNodes = discoveryClient.getChildren(engineSpace)
engineNodes.foreach { node =>
val nodePath = s"$engineSpace/$node"
val engineNodes = discoveryClient.getServiceNodesInfo(engineSpace, silent = true)
engineNodes.foreach { engineNode =>
val nodePath = s"$engineSpace/${engineNode.nodeName}"
val engineRefId = engineNode.engineRefId.orNull
info(s"Deleting engine node:$nodePath")
try {
discoveryClient.delete(nodePath)
responseMsgBuilder
.append(s"Engine $engineSpace refId=$engineRefId is deleted successfully.")
} catch {
case e: Exception =>
error(s"Failed to delete engine node:$nodePath", e)
throw new NotFoundException(s"Failed to delete engine node:$nodePath," +
s"${e.getMessage}")
}
if (kill && engineRefId != null) {
val appMgrInfo =
engineNode.attributes.get(KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO_KEY)
.map(ApplicationManagerInfo.deserialize).getOrElse(ApplicationManagerInfo(None))
val killResponse = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
.applicationManager.killApplication(appMgrInfo, engineRefId)
responseMsgBuilder
.append(s"\nKilled engine with $appMgrInfo/$engineRefId: $killResponse")
}
}
}
Response.ok(s"Engine $engineSpace is deleted successfully.").build()
Response.ok(responseMsgBuilder.toString()).build()
}
@ApiResponse(

View File

@ -27,7 +27,9 @@ import scala.util.matching.Regex
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS, ENGINE_FLINK_EXTRA_CLASSPATH, ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_APP_MGR_INFO_KEY, KYUUBI_ENGINE_CREDENTIALS_KEY}
import org.apache.kyuubi.engine.ApplicationManagerInfo
import org.apache.kyuubi.engine.ApplicationManagerInfo.serialize
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
class FlinkProcessBuilderSuite extends KyuubiFunSuite {
@ -39,6 +41,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
ENGINE_FLINK_JAVA_OPTIONS,
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005")
.set(KYUUBI_ENGINE_CREDENTIALS_KEY, "should-not-be-used")
.set(KYUUBI_ENGINE_APP_MGR_INFO_KEY, serialize(ApplicationManagerInfo(None)))
private def applicationModeConf = KyuubiConf()
.set("flink.execution.target", "yarn-application")
@ -46,6 +49,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
.set(APP_KEY, "kyuubi_connection_flink_paul")
.set("kyuubi.on", "off")
.set(KYUUBI_ENGINE_CREDENTIALS_KEY, "should-not-be-used")
.set(KYUUBI_ENGINE_APP_MGR_INFO_KEY, serialize(ApplicationManagerInfo(None)))
private val tempFlinkHome = Files.createTempDirectory("flink-home").toFile
private val tempOpt =

View File

@ -33,7 +33,7 @@ import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState, EngineRef, KyuubiApplicationManager}
import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState, EngineRef, KubernetesInfo, KyuubiApplicationManager}
import org.apache.kyuubi.engine.EngineType.SPARK_SQL
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, USER}
import org.apache.kyuubi.ha.HighAvailabilityConf
@ -301,6 +301,54 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(!operations.map(op => op.getIdentifier).contains(operation.identifier.toString))
}
test("force to kill engine - user share level") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test")
conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop")
val engine =
new EngineRef(
conf.clone,
Utils.currentUser,
true,
PluginLoader.loadGroupProvider(conf),
id,
null)
val engineSpace = DiscoveryPaths.makePath(
s"kyuubi_test_${KYUUBI_VERSION}_USER_SPARK_SQL",
Utils.currentUser,
"default")
withDiscoveryClient(conf) { client =>
engine.getOrCreate(client)
assert(client.pathExists(engineSpace))
assert(client.getChildren(engineSpace).size == 1)
val response = webTarget.path("api/v1/admin/engine")
.queryParam("sharelevel", "USER")
.queryParam("type", "spark_sql")
.queryParam("kill", "true")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
.delete()
assert(response.getStatus === 200)
eventually(timeout(5.seconds), interval(100.milliseconds)) {
assert(client.getChildren(engineSpace).isEmpty, s"refId same with $id?")
}
eventually(timeout(30.seconds), interval(100.milliseconds)) {
val appMgrInfo = ApplicationManagerInfo(None, KubernetesInfo(None, None))
assert(engineMgr.getApplicationInfo(appMgrInfo, id)
.exists(_.state == ApplicationState.NOT_FOUND))
}
}
}
test("delete engine - user share level") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)

View File

@ -93,7 +93,7 @@ class AdminCtlSuite extends RestClientTestHelper with TestPrematureExit {
ldapUserPasswd)
testPrematureExitForAdminControlCli(
args,
s"Engine ${engineSpace} is deleted successfully.")
s"Engine ${engineSpace} refId=${id} is deleted successfully.")
args = Array(
"list",

View File

@ -85,8 +85,9 @@ class AdminRestApiSuite extends RestClientTestHelper {
assert(engines(0).getNamespace == engineSpace)
assert(engines(0).getAttributes.get(KyuubiReservedKeys.KYUUBI_ENGINE_ID).startsWith("local-"))
val result = adminRestApi.deleteEngine("spark_sql", "user", "default", "")
assert(result == s"Engine ${engineSpace} is deleted successfully.")
// kill engine to release memory quickly
val result = adminRestApi.deleteEngine("spark_sql", "user", "default", "", true)
assert(result startsWith s"Engine ${engineSpace} refId=${id} is deleted successfully.")
engines = adminRestApi.listEngines("spark_sql", "user", "default", "").asScala
assert(engines.isEmpty)