[KYUUBI #4994][REST] Support listing all engines
### _Why are the changes needed?_ close #4994 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes #5157 from lsm1/branch-kyuubi_4994. Closes #4994 02e7eb5fa [senmiaoliu] use abbr a 6d001e519 [senmiaoliu] replace 'allengine' with 'all' f9c548299 [senmiaoliu] fix style df64e773e [senmiaoliu] fix style 6a7d40e63 [senmiaoliu] remove allenginecommand 6306dd8a2 [senmiaoliu] support list all engine Authored-by: senmiaoliu <senmiaoliu@trip.com> Signed-off-by: Shaoyun Chen <csy@apache.org>
This commit is contained in:
parent
ffebc647f8
commit
16ae852891
@ -99,6 +99,8 @@ Usage: ``bin/kyuubi-admin list engine [options]``
|
||||
- The subdomain for the share level of an engine. If not specified, it will read the configuration item kyuubi.engine.share.level.subdomain from kyuubi-defaults.conf.
|
||||
* - --hs2ProxyUser
|
||||
- The proxy user to impersonate. When specified, it will list engines for the hs2ProxyUser.
|
||||
* - -a --all
|
||||
- All the engine.
|
||||
|
||||
.. _list_server:
|
||||
|
||||
|
||||
@ -61,6 +61,7 @@ class AdminControlCliArguments(args: Seq[String], env: Map[String, String] = sys
|
||||
| type ${cliConfig.engineOpts.engineType}
|
||||
| sharelevel ${cliConfig.engineOpts.engineShareLevel}
|
||||
| sharesubdomain ${cliConfig.engineOpts.engineSubdomain}
|
||||
| all ${cliConfig.engineOpts.all}
|
||||
""".stripMargin
|
||||
case ControlObject.SERVER =>
|
||||
s"""Parsed arguments:
|
||||
|
||||
@ -38,7 +38,8 @@ class AdminListEngineCommand(cliConfig: CliConfig)
|
||||
normalizedCliConfig.engineOpts.engineType,
|
||||
normalizedCliConfig.engineOpts.engineShareLevel,
|
||||
normalizedCliConfig.engineOpts.engineSubdomain,
|
||||
normalizedCliConfig.commonOpts.hs2ProxyUser).asScala
|
||||
normalizedCliConfig.commonOpts.hs2ProxyUser,
|
||||
normalizedCliConfig.engineOpts.all).asScala
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -52,7 +52,7 @@ object AdminCommandLine extends CommonCommandLine {
|
||||
.text("\tDelete resources.")
|
||||
.action((_, c) => c.copy(action = ControlAction.DELETE))
|
||||
.children(
|
||||
engineCmd(builder).text("\tDelete the specified engine node for user.")))
|
||||
deleteEngineCmd(builder).text("\tDelete the specified engine node for user.")))
|
||||
|
||||
}
|
||||
|
||||
@ -64,7 +64,7 @@ object AdminCommandLine extends CommonCommandLine {
|
||||
.text("\tList information about resources.")
|
||||
.action((_, c) => c.copy(action = ControlAction.LIST))
|
||||
.children(
|
||||
engineCmd(builder).text("\tList all the engine nodes for a user"),
|
||||
listEngineCmd(builder).text("\tList the engine nodes"),
|
||||
serverCmd(builder).text("\tList all the server nodes")))
|
||||
|
||||
}
|
||||
@ -80,7 +80,7 @@ object AdminCommandLine extends CommonCommandLine {
|
||||
refreshConfigCmd(builder).text("\tRefresh the config with specified type.")))
|
||||
}
|
||||
|
||||
private def engineCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
|
||||
private def deleteEngineCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
|
||||
import builder._
|
||||
cmd("engine").action((_, c) => c.copy(resource = ControlObject.ENGINE))
|
||||
.children(
|
||||
@ -95,6 +95,24 @@ object AdminCommandLine extends CommonCommandLine {
|
||||
.text("The engine share level this engine belong to."))
|
||||
}
|
||||
|
||||
private def listEngineCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
|
||||
import builder._
|
||||
cmd("engine").action((_, c) => c.copy(resource = ControlObject.ENGINE))
|
||||
.children(
|
||||
opt[String]("engine-type").abbr("et")
|
||||
.action((v, c) => c.copy(engineOpts = c.engineOpts.copy(engineType = v)))
|
||||
.text("The engine type this engine belong to."),
|
||||
opt[String]("engine-subdomain").abbr("es")
|
||||
.action((v, c) => c.copy(engineOpts = c.engineOpts.copy(engineSubdomain = v)))
|
||||
.text("The engine subdomain this engine belong to."),
|
||||
opt[String]("engine-share-level").abbr("esl")
|
||||
.action((v, c) => c.copy(engineOpts = c.engineOpts.copy(engineShareLevel = v)))
|
||||
.text("The engine share level this engine belong to."),
|
||||
opt[String]("all").abbr("a")
|
||||
.action((v, c) => c.copy(engineOpts = c.engineOpts.copy(all = v)))
|
||||
.text("All the engine."))
|
||||
}
|
||||
|
||||
private def serverCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
|
||||
import builder._
|
||||
cmd("server").action((_, c) => c.copy(resource = ControlObject.SERVER))
|
||||
|
||||
@ -77,6 +77,7 @@ case class EngineOpts(
|
||||
user: String = null,
|
||||
engineType: String = null,
|
||||
engineSubdomain: String = null,
|
||||
engineShareLevel: String = null)
|
||||
engineShareLevel: String = null,
|
||||
all: String = null)
|
||||
|
||||
case class AdminConfigOpts(configType: String = null)
|
||||
|
||||
@ -158,13 +158,14 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi
|
||||
|Command: list [engine|server]
|
||||
| List information about resources.
|
||||
|Command: list engine [options]
|
||||
| List all the engine nodes for a user
|
||||
| List the engine nodes
|
||||
| -et, --engine-type <value>
|
||||
| The engine type this engine belong to.
|
||||
| -es, --engine-subdomain <value>
|
||||
| The engine subdomain this engine belong to.
|
||||
| -esl, --engine-share-level <value>
|
||||
| The engine share level this engine belong to.
|
||||
| -a, --all <value> All the engine.
|
||||
|Command: list server
|
||||
| List all the server nodes
|
||||
|
|
||||
|
||||
@ -73,12 +73,13 @@ public class AdminRestApi {
|
||||
}
|
||||
|
||||
public List<Engine> listEngines(
|
||||
String engineType, String shareLevel, String subdomain, String hs2ProxyUser) {
|
||||
String engineType, String shareLevel, String subdomain, String hs2ProxyUser, String all) {
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put("type", engineType);
|
||||
params.put("sharelevel", shareLevel);
|
||||
params.put("subdomain", subdomain);
|
||||
params.put("hive.server2.proxy.user", hs2ProxyUser);
|
||||
params.put("all", all);
|
||||
Engine[] result =
|
||||
this.getClient()
|
||||
.get(API_BASE_PATH + "/engine", params, Engine[].class, client.getAuthHeader());
|
||||
|
||||
@ -286,7 +286,51 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
|
||||
@QueryParam("type") engineType: String,
|
||||
@QueryParam("sharelevel") shareLevel: String,
|
||||
@QueryParam("subdomain") subdomain: String,
|
||||
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Seq[Engine] = {
|
||||
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String,
|
||||
@QueryParam("all") @DefaultValue("false") all: String): Seq[Engine] = {
|
||||
if (all.toBoolean) {
|
||||
val userName = fe.getSessionUser(Map.empty[String, String])
|
||||
val ipAddress = fe.getIpAddress
|
||||
info(s"Received list all kyuubi engine request from $userName/$ipAddress")
|
||||
if (!isAdministrator(userName)) {
|
||||
throw new NotAllowedException(
|
||||
s"$userName is not allowed to list all kyuubi engine")
|
||||
}
|
||||
val engines = ListBuffer[Engine]()
|
||||
val engineSpace = fe.getConf.get(HA_NAMESPACE)
|
||||
val shareLevel = fe.getConf.get(ENGINE_SHARE_LEVEL)
|
||||
val engineType = fe.getConf.get(ENGINE_TYPE)
|
||||
withDiscoveryClient(fe.getConf) { discoveryClient =>
|
||||
val commonParent = s"/${engineSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType"
|
||||
info(s"Listing engine nodes for $commonParent")
|
||||
try {
|
||||
discoveryClient.getChildren(commonParent).map {
|
||||
user =>
|
||||
val engine = getEngine(user, engineType, shareLevel, "", "")
|
||||
val engineSpace = getEngineSpace(engine)
|
||||
discoveryClient.getChildren(engineSpace).map { child =>
|
||||
info(s"Listing engine nodes for $engineSpace/$child")
|
||||
engines ++= discoveryClient.getServiceNodesInfo(s"$engineSpace/$child").map(node =>
|
||||
new Engine(
|
||||
engine.getVersion,
|
||||
engine.getUser,
|
||||
engine.getEngineType,
|
||||
engine.getSharelevel,
|
||||
node.namespace.split("/").last,
|
||||
node.instance,
|
||||
node.namespace,
|
||||
node.attributes.asJava))
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case nne: NoNodeException =>
|
||||
error(s"No such engine for engine type: $engineType, share level: $shareLevel", nne)
|
||||
throw new NotFoundException(
|
||||
s"No such engine for engine type: $engineType, share level: $shareLevel")
|
||||
}
|
||||
}
|
||||
return engines.toSeq
|
||||
}
|
||||
val userName = if (isAdministrator(fe.getRealUser())) {
|
||||
Option(hs2ProxyUser).getOrElse(fe.getRealUser())
|
||||
} else {
|
||||
|
||||
@ -587,4 +587,155 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
|
||||
assert("Running".equals(testServer.getStatus))
|
||||
}
|
||||
}
|
||||
|
||||
test("list all engine - user share level") {
|
||||
val id = UUID.randomUUID().toString
|
||||
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
|
||||
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
|
||||
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
|
||||
conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test")
|
||||
conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 180000L)
|
||||
conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop")
|
||||
|
||||
val engine =
|
||||
new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id, null)
|
||||
|
||||
val engineSpace = DiscoveryPaths.makePath(
|
||||
s"kyuubi_test_${KYUUBI_VERSION}_USER_SPARK_SQL",
|
||||
Utils.currentUser,
|
||||
"")
|
||||
|
||||
withDiscoveryClient(conf) { client =>
|
||||
engine.getOrCreate(client)
|
||||
|
||||
assert(client.pathExists(engineSpace))
|
||||
assert(client.getChildren(engineSpace).size == 1)
|
||||
|
||||
val response = webTarget.path("api/v1/admin/engine")
|
||||
.queryParam("all", "true")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
|
||||
.get
|
||||
|
||||
assert(200 == response.getStatus)
|
||||
val engines = response.readEntity(new GenericType[Seq[Engine]]() {})
|
||||
assert(engines.size == 1)
|
||||
assert(engines(0).getEngineType == "SPARK_SQL")
|
||||
assert(engines(0).getSharelevel == "USER")
|
||||
assert(engines(0).getSubdomain == "default")
|
||||
|
||||
// kill the engine application
|
||||
engineMgr.killApplication(ApplicationManagerInfo(None), id)
|
||||
eventually(timeout(30.seconds), interval(100.milliseconds)) {
|
||||
assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id).exists(
|
||||
_.state == ApplicationState.NOT_FOUND))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("list all engines - group share level") {
|
||||
val id = UUID.randomUUID().toString
|
||||
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, GROUP.toString)
|
||||
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
|
||||
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
|
||||
conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test")
|
||||
conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 180000L)
|
||||
conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop")
|
||||
|
||||
val engine =
|
||||
new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id, null)
|
||||
|
||||
val engineSpace = DiscoveryPaths.makePath(
|
||||
s"kyuubi_test_${KYUUBI_VERSION}_GROUP_SPARK_SQL",
|
||||
fe.asInstanceOf[KyuubiRestFrontendService].sessionManager.groupProvider.primaryGroup(
|
||||
Utils.currentUser,
|
||||
null),
|
||||
"")
|
||||
|
||||
withDiscoveryClient(conf) { client =>
|
||||
engine.getOrCreate(client)
|
||||
|
||||
assert(client.pathExists(engineSpace))
|
||||
assert(client.getChildren(engineSpace).size == 1)
|
||||
|
||||
val response = webTarget.path("api/v1/admin/engine")
|
||||
.queryParam("all", "true")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
|
||||
.get
|
||||
|
||||
assert(200 == response.getStatus)
|
||||
val engines = response.readEntity(new GenericType[Seq[Engine]]() {})
|
||||
assert(engines.size == 1)
|
||||
assert(engines(0).getEngineType == "SPARK_SQL")
|
||||
assert(engines(0).getSharelevel == "GROUP")
|
||||
assert(engines(0).getSubdomain == "default")
|
||||
|
||||
// kill the engine application
|
||||
engineMgr.killApplication(ApplicationManagerInfo(None), id)
|
||||
eventually(timeout(30.seconds), interval(100.milliseconds)) {
|
||||
assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id).exists(
|
||||
_.state == ApplicationState.NOT_FOUND))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("list all engines - connection share level") {
|
||||
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString)
|
||||
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
|
||||
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
|
||||
conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test")
|
||||
conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 180000L)
|
||||
conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop")
|
||||
|
||||
val engineSpace = DiscoveryPaths.makePath(
|
||||
s"kyuubi_test_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL",
|
||||
Utils.currentUser,
|
||||
"")
|
||||
|
||||
val id1 = UUID.randomUUID().toString
|
||||
val engine1 =
|
||||
new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id1, null)
|
||||
val engineSpace1 = DiscoveryPaths.makePath(
|
||||
s"kyuubi_test_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL",
|
||||
Utils.currentUser,
|
||||
id1)
|
||||
|
||||
val id2 = UUID.randomUUID().toString
|
||||
val engine2 =
|
||||
new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id2, null)
|
||||
val engineSpace2 = DiscoveryPaths.makePath(
|
||||
s"kyuubi_test_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL",
|
||||
Utils.currentUser,
|
||||
id2)
|
||||
|
||||
withDiscoveryClient(conf) { client =>
|
||||
engine1.getOrCreate(client)
|
||||
engine2.getOrCreate(client)
|
||||
|
||||
assert(client.pathExists(engineSpace))
|
||||
assert(client.getChildren(engineSpace).size == 2)
|
||||
assert(client.pathExists(engineSpace1))
|
||||
assert(client.pathExists(engineSpace2))
|
||||
|
||||
val response = webTarget.path("api/v1/admin/engine")
|
||||
.queryParam("all", "true")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
|
||||
.get
|
||||
assert(200 == response.getStatus)
|
||||
val result = response.readEntity(new GenericType[Seq[Engine]]() {})
|
||||
assert(result.size == 2)
|
||||
|
||||
// kill the engine application
|
||||
engineMgr.killApplication(ApplicationManagerInfo(None), id1)
|
||||
engineMgr.killApplication(ApplicationManagerInfo(None), id2)
|
||||
eventually(timeout(30.seconds), interval(100.milliseconds)) {
|
||||
assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id1)
|
||||
.exists(_.state == ApplicationState.NOT_FOUND))
|
||||
assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id2)
|
||||
.exists(_.state == ApplicationState.NOT_FOUND))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -74,7 +74,7 @@ class AdminRestApiSuite extends RestClientTestHelper {
|
||||
.build()
|
||||
|
||||
val adminRestApi = new AdminRestApi(basicKyuubiRestClient)
|
||||
var engines = adminRestApi.listEngines("spark_sql", "user", "default", "").asScala
|
||||
var engines = adminRestApi.listEngines("spark_sql", "user", "default", "", "false").asScala
|
||||
assert(engines.size == 1)
|
||||
assert(engines(0).getUser == user)
|
||||
assert(engines(0).getVersion == KYUUBI_VERSION)
|
||||
@ -87,7 +87,7 @@ class AdminRestApiSuite extends RestClientTestHelper {
|
||||
val result = adminRestApi.deleteEngine("spark_sql", "user", "default", "")
|
||||
assert(result == s"Engine ${engineSpace} is deleted successfully.")
|
||||
|
||||
engines = adminRestApi.listEngines("spark_sql", "user", "default", "").asScala
|
||||
engines = adminRestApi.listEngines("spark_sql", "user", "default", "", "false").asScala
|
||||
assert(engines.isEmpty)
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user