From d299da77b3657801e30392b2db0aca2cbed23e56 Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Sun, 21 Apr 2024 18:18:19 -0700 Subject: [PATCH] [KYUUBI #6049] Support to filter sessions/operations with session type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description Support to filter sessions/operations with session type. ## Issue References ๐Ÿ”— This pull request fixes # ## Describe Your Solution ๐Ÿ”ง Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐Ÿ“ - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6049 from turboFei/batch_interactive. Closes #6049 68390c774 [Wang, Fei] add ut bfc2cb8c2 [Fei Wang] save c1979c7ea [Fei Wang] saev Lead-authored-by: Fei Wang Co-authored-by: Wang, Fei Signed-off-by: Wang, Fei --- .../apache/kyuubi/client/AdminRestApi.java | 15 ++++++-- .../kyuubi/server/api/v1/AdminResource.scala | 16 +++++++- .../server/api/v1/AdminResourceSuite.scala | 37 +++++++++++++++++++ 3 files changed, 62 insertions(+), 6 deletions(-) diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java index 002f6b46d..0f6fbbc47 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java @@ -89,14 +89,17 @@ public class AdminRestApi { } public List listSessions() { - return listSessions(Collections.emptyList()); + return listSessions(Collections.emptyList(), null); } - public List listSessions(List users) { + public List listSessions(List users, String sessionType) { Map params = new HashMap<>(); if (users != null && !users.isEmpty()) { params.put("users", String.join(",", users)); } + if (StringUtils.isNotBlank(sessionType)) { + params.put("sessionType", sessionType); + } SessionData[] result = this.getClient() .get(API_BASE_PATH + "/sessions", params, SessionData[].class, client.getAuthHeader()); @@ -109,10 +112,11 @@ public class AdminRestApi { } public List listOperations() { - return listOperations(Collections.emptyList(), null); + return listOperations(Collections.emptyList(), null, null); } - public List listOperations(List users, String sessionHandleStr) { + public List listOperations( + List users, String sessionHandleStr, String sessionType) { Map params = new HashMap<>(); if (users != null && !users.isEmpty()) { params.put("users", String.join(",", users)); @@ -120,6 +124,9 @@ public class AdminRestApi { if (StringUtils.isNotBlank(sessionHandleStr)) { params.put("sessionHandle", sessionHandleStr); } + if (StringUtils.isNotBlank(sessionType)) { + params.put("sessionType", sessionType); + } OperationData[] result = this.getClient() .get( diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala index e31c792c3..2e61e6b08 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala @@ -168,7 +168,9 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { description = "get the list of all live sessions") @GET @Path("sessions") - def sessions(@QueryParam("users") users: String): Seq[SessionData] = { + def sessions( + @QueryParam("users") users: String, + @QueryParam("sessionType") sessionType: String): Seq[SessionData] = { val userName = fe.getSessionUser(Map.empty[String, String]) val ipAddress = fe.getIpAddress info(s"Received listing all live sessions request from $userName/$ipAddress") @@ -177,6 +179,10 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { s"$userName is not allowed to list all live sessions") } var sessions = fe.be.sessionManager.allSessions() + if (StringUtils.isNoneBlank(sessionType)) { + sessions = sessions.filter(session => + sessionType.equals(session.asInstanceOf[KyuubiSession].sessionType.toString)) + } if (StringUtils.isNotBlank(users)) { val usersSet = users.split(",").toSet sessions = sessions.filter(session => usersSet.contains(session.user)) @@ -214,7 +220,8 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { @Path("operations") def listOperations( @QueryParam("users") users: String, - @QueryParam("sessionHandle") sessionHandle: String): Seq[OperationData] = { + @QueryParam("sessionHandle") sessionHandle: String, + @QueryParam("sessionType") sessionType: String): Seq[OperationData] = { val userName = fe.getSessionUser(Map.empty[String, String]) val ipAddress = fe.getIpAddress info(s"Received listing all of the active operations request from $userName/$ipAddress") @@ -231,6 +238,11 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { operations = operations.filter(operation => operation.getSession.handle.equals(SessionHandle.fromUUID(sessionHandle))) } + if (StringUtils.isNotBlank(sessionType)) { + operations = operations.filter(operation => + sessionType.equalsIgnoreCase( + operation.getSession.asInstanceOf[KyuubiSession].sessionType.toString)) + } operations .map(operation => ApiUtils.operationData(operation.asInstanceOf[KyuubiOperation])).toSeq } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala index 95aa3de02..2360dea60 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala @@ -44,6 +44,7 @@ import org.apache.kyuubi.server.KyuubiRestFrontendService import org.apache.kyuubi.server.http.util.HttpAuthUtils import org.apache.kyuubi.server.http.util.HttpAuthUtils.AUTHORIZATION_HEADER import org.apache.kyuubi.service.authentication.AnonymousAuthenticationProviderImpl +import org.apache.kyuubi.session.SessionType import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2 class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { @@ -230,6 +231,42 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { operations = response.readEntity(classOf[Seq[OperationData]]) assert(response.getStatus === 200) assert(operations.size == 1) + + response = webTarget.path("api/v1/admin/sessions") + .queryParam("sessionType", SessionType.INTERACTIVE.toString) + .request() + .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser)) + .get() + sessions = response.readEntity(classOf[Seq[SessionData]]) + assert(response.getStatus === 200) + assert(sessions.size > 0) + + response = webTarget.path("api/v1/admin/sessions") + .queryParam("sessionType", SessionType.BATCH.toString) + .request() + .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser)) + .get() + sessions = response.readEntity(classOf[Seq[SessionData]]) + assert(response.getStatus === 200) + assert(sessions.size == 0) + + response = webTarget.path("api/v1/admin/operations") + .queryParam("sessionType", SessionType.INTERACTIVE.toString) + .request() + .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser)) + .get() + operations = response.readEntity(classOf[Seq[OperationData]]) + assert(response.getStatus === 200) + assert(operations.size > 0) + + response = webTarget.path("api/v1/admin/operations") + .queryParam("sessionType", SessionType.BATCH.toString) + .request() + .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser)) + .get() + operations = response.readEntity(classOf[Seq[OperationData]]) + assert(response.getStatus === 200) + assert(operations.size == 0) } test("list/close operations") {