[KYUUBI #4738] AdminResource.getEngineSpace should use primary group name on GROUP share level

### _Why are the changes needed?_

Closes #4738

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4740 from HaoYang670/4738_fix_GROUP_REST_get_engine.

Closes #4738

d044b2ac2 [remzi] fmt
68630250a [remzi] add tests
47f2dc205 [remzi] Merge remote-tracking branch 'upstream/master' into 4738_fix_GROUP_REST_get_engine
60d40f5b7 [remzi] fix style
c077fcd84 [remzi] fix style
1126c1c72 [remzi] Merge remote-tracking branch 'upstream/master' into 4738_fix_GROUP_REST_get_engine
1b03ef559 [remzi] address review comments
dbb6b17ec [remzi] restore gitignore
df528ff5b [remzi] ignore scala metals things
91789f968 [remzi] use group name

Authored-by: remzi <13716567376yh@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
remzi 2023-04-24 23:48:16 +08:00 committed by Cheng Pan
parent 7b97d2fa41
commit efe81ac0b1
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
3 changed files with 101 additions and 3 deletions

View File

@ -52,7 +52,7 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
private def hadoopConf: Configuration = KyuubiServer.getHadoopConf()
private def sessionManager = be.sessionManager.asInstanceOf[KyuubiSessionManager]
private[kyuubi] def sessionManager = be.sessionManager.asInstanceOf[KyuubiSessionManager]
private val batchChecker = ThreadUtils.newDaemonSingleThreadScheduledExecutor("batch-checker")

View File

@ -355,9 +355,15 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
private def getEngineSpace(engine: Engine): String = {
val serverSpace = fe.getConf.get(HA_NAMESPACE)
val appUser = engine.getSharelevel match {
case "GROUP" =>
fe.sessionManager.groupProvider.primaryGroup(engine.getUser, fe.getConf.getAll.asJava)
case _ => engine.getUser
}
DiscoveryPaths.makePath(
s"${serverSpace}_${engine.getVersion}_${engine.getSharelevel}_${engine.getEngineType}",
engine.getUser,
appUser,
engine.getSubdomain)
}

View File

@ -35,7 +35,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
import org.apache.kyuubi.engine.{ApplicationState, EngineRef, KyuubiApplicationManager}
import org.apache.kyuubi.engine.EngineType.SPARK_SQL
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, USER}
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, USER}
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.{DiscoveryPaths, ServiceDiscovery}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
@ -300,6 +300,52 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
}
}
test("delete engine - group share level") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, GROUP.toString)
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test")
conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 180000L)
conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop")
val engine =
new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id, null)
val engineSpace = DiscoveryPaths.makePath(
s"kyuubi_test_${KYUUBI_VERSION}_GROUP_SPARK_SQL",
fe.asInstanceOf[KyuubiRestFrontendService].sessionManager.groupProvider.primaryGroup(
Utils.currentUser,
null),
"default")
withDiscoveryClient(conf) { client =>
engine.getOrCreate(client)
assert(client.pathExists(engineSpace))
assert(client.getChildren(engineSpace).size == 1)
val response = webTarget.path("api/v1/admin/engine")
.queryParam("sharelevel", "GROUP")
.queryParam("type", "spark_sql")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
.delete()
assert(200 == response.getStatus)
assert(client.pathExists(engineSpace))
eventually(timeout(5.seconds), interval(100.milliseconds)) {
assert(client.getChildren(engineSpace).size == 0, s"refId same with $id?")
}
// kill the engine application
engineMgr.killApplication(None, id)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
assert(engineMgr.getApplicationInfo(None, id).exists(_.state == ApplicationState.NOT_FOUND))
}
}
}
test("delete engine - connection share level") {
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString)
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
@ -378,6 +424,52 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
}
}
test("list engine - group share level") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, GROUP.toString)
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test")
conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 180000L)
conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop")
val engine =
new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id, null)
val engineSpace = DiscoveryPaths.makePath(
s"kyuubi_test_${KYUUBI_VERSION}_GROUP_SPARK_SQL",
fe.asInstanceOf[KyuubiRestFrontendService].sessionManager.groupProvider.primaryGroup(
Utils.currentUser,
null),
"")
withDiscoveryClient(conf) { client =>
engine.getOrCreate(client)
assert(client.pathExists(engineSpace))
assert(client.getChildren(engineSpace).size == 1)
val response = webTarget.path("api/v1/admin/engine")
.queryParam("type", "spark_sql")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
.get
assert(200 == response.getStatus)
val engines = response.readEntity(new GenericType[Seq[Engine]]() {})
assert(engines.size == 1)
assert(engines(0).getEngineType == "SPARK_SQL")
assert(engines(0).getSharelevel == "GROUP")
assert(engines(0).getSubdomain == "default")
// kill the engine application
engineMgr.killApplication(None, id)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
assert(engineMgr.getApplicationInfo(None, id).exists(_.state == ApplicationState.NOT_FOUND))
}
}
}
test("list engine - connection share level") {
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString)
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)