diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala index cd191afe8..c9d571008 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala @@ -52,7 +52,7 @@ class KyuubiRestFrontendService(override val serverable: Serverable) private def hadoopConf: Configuration = KyuubiServer.getHadoopConf() - private def sessionManager = be.sessionManager.asInstanceOf[KyuubiSessionManager] + private[kyuubi] def sessionManager = be.sessionManager.asInstanceOf[KyuubiSessionManager] private val batchChecker = ThreadUtils.newDaemonSingleThreadScheduledExecutor("batch-checker") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala index 113660a41..cf8478021 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala @@ -355,9 +355,15 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { private def getEngineSpace(engine: Engine): String = { val serverSpace = fe.getConf.get(HA_NAMESPACE) + val appUser = engine.getSharelevel match { + case "GROUP" => + fe.sessionManager.groupProvider.primaryGroup(engine.getUser, fe.getConf.getAll.asJava) + case _ => engine.getUser + } + DiscoveryPaths.makePath( s"${serverSpace}_${engine.getVersion}_${engine.getSharelevel}_${engine.getEngineType}", - engine.getUser, + appUser, engine.getSubdomain) } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala index b7650627e..f7a086de4 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala @@ -35,7 +35,7 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY import org.apache.kyuubi.engine.{ApplicationState, EngineRef, KyuubiApplicationManager} import org.apache.kyuubi.engine.EngineType.SPARK_SQL -import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, USER} +import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, USER} import org.apache.kyuubi.ha.HighAvailabilityConf import org.apache.kyuubi.ha.client.{DiscoveryPaths, ServiceDiscovery} import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient @@ -300,6 +300,52 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { } } + test("delete engine - group share level") { + val id = UUID.randomUUID().toString + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, GROUP.toString) + conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString) + conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) + conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test") + conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 180000L) + conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") + + val engine = + new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id, null) + + val engineSpace = DiscoveryPaths.makePath( + s"kyuubi_test_${KYUUBI_VERSION}_GROUP_SPARK_SQL", + fe.asInstanceOf[KyuubiRestFrontendService].sessionManager.groupProvider.primaryGroup( + Utils.currentUser, + null), + "default") + + withDiscoveryClient(conf) { client => + engine.getOrCreate(client) + + assert(client.pathExists(engineSpace)) + assert(client.getChildren(engineSpace).size == 1) + + val response = webTarget.path("api/v1/admin/engine") + .queryParam("sharelevel", "GROUP") + .queryParam("type", "spark_sql") + .request(MediaType.APPLICATION_JSON_TYPE) + .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization") + .delete() + + assert(200 == response.getStatus) + assert(client.pathExists(engineSpace)) + eventually(timeout(5.seconds), interval(100.milliseconds)) { + assert(client.getChildren(engineSpace).size == 0, s"refId same with $id?") + } + + // kill the engine application + engineMgr.killApplication(None, id) + eventually(timeout(30.seconds), interval(100.milliseconds)) { + assert(engineMgr.getApplicationInfo(None, id).exists(_.state == ApplicationState.NOT_FOUND)) + } + } + } + test("delete engine - connection share level") { conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString) conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString) @@ -378,6 +424,52 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { } } + test("list engine - group share level") { + val id = UUID.randomUUID().toString + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, GROUP.toString) + conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString) + conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) + conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test") + conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 180000L) + conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") + + val engine = + new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id, null) + + val engineSpace = DiscoveryPaths.makePath( + s"kyuubi_test_${KYUUBI_VERSION}_GROUP_SPARK_SQL", + fe.asInstanceOf[KyuubiRestFrontendService].sessionManager.groupProvider.primaryGroup( + Utils.currentUser, + null), + "") + + withDiscoveryClient(conf) { client => + engine.getOrCreate(client) + + assert(client.pathExists(engineSpace)) + assert(client.getChildren(engineSpace).size == 1) + + val response = webTarget.path("api/v1/admin/engine") + .queryParam("type", "spark_sql") + .request(MediaType.APPLICATION_JSON_TYPE) + .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization") + .get + + assert(200 == response.getStatus) + val engines = response.readEntity(new GenericType[Seq[Engine]]() {}) + assert(engines.size == 1) + assert(engines(0).getEngineType == "SPARK_SQL") + assert(engines(0).getSharelevel == "GROUP") + assert(engines(0).getSubdomain == "default") + + // kill the engine application + engineMgr.killApplication(None, id) + eventually(timeout(30.seconds), interval(100.milliseconds)) { + assert(engineMgr.getApplicationInfo(None, id).exists(_.state == ApplicationState.NOT_FOUND)) + } + } + } + test("list engine - connection share level") { conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString) conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)