diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java index 002f6b46d..0f6fbbc47 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java @@ -89,14 +89,17 @@ public class AdminRestApi { } public List listSessions() { - return listSessions(Collections.emptyList()); + return listSessions(Collections.emptyList(), null); } - public List listSessions(List users) { + public List listSessions(List users, String sessionType) { Map params = new HashMap<>(); if (users != null && !users.isEmpty()) { params.put("users", String.join(",", users)); } + if (StringUtils.isNotBlank(sessionType)) { + params.put("sessionType", sessionType); + } SessionData[] result = this.getClient() .get(API_BASE_PATH + "/sessions", params, SessionData[].class, client.getAuthHeader()); @@ -109,10 +112,11 @@ public class AdminRestApi { } public List listOperations() { - return listOperations(Collections.emptyList(), null); + return listOperations(Collections.emptyList(), null, null); } - public List listOperations(List users, String sessionHandleStr) { + public List listOperations( + List users, String sessionHandleStr, String sessionType) { Map params = new HashMap<>(); if (users != null && !users.isEmpty()) { params.put("users", String.join(",", users)); @@ -120,6 +124,9 @@ public class AdminRestApi { if (StringUtils.isNotBlank(sessionHandleStr)) { params.put("sessionHandle", sessionHandleStr); } + if (StringUtils.isNotBlank(sessionType)) { + params.put("sessionType", sessionType); + } OperationData[] result = this.getClient() .get( diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala index e31c792c3..2e61e6b08 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala @@ -168,7 +168,9 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { description = "get the list of all live sessions") @GET @Path("sessions") - def sessions(@QueryParam("users") users: String): Seq[SessionData] = { + def sessions( + @QueryParam("users") users: String, + @QueryParam("sessionType") sessionType: String): Seq[SessionData] = { val userName = fe.getSessionUser(Map.empty[String, String]) val ipAddress = fe.getIpAddress info(s"Received listing all live sessions request from $userName/$ipAddress") @@ -177,6 +179,10 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { s"$userName is not allowed to list all live sessions") } var sessions = fe.be.sessionManager.allSessions() + if (StringUtils.isNoneBlank(sessionType)) { + sessions = sessions.filter(session => + sessionType.equals(session.asInstanceOf[KyuubiSession].sessionType.toString)) + } if (StringUtils.isNotBlank(users)) { val usersSet = users.split(",").toSet sessions = sessions.filter(session => usersSet.contains(session.user)) @@ -214,7 +220,8 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { @Path("operations") def listOperations( @QueryParam("users") users: String, - @QueryParam("sessionHandle") sessionHandle: String): Seq[OperationData] = { + @QueryParam("sessionHandle") sessionHandle: String, + @QueryParam("sessionType") sessionType: String): Seq[OperationData] = { val userName = fe.getSessionUser(Map.empty[String, String]) val ipAddress = fe.getIpAddress info(s"Received listing all of the active operations request from $userName/$ipAddress") @@ -231,6 +238,11 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { operations = operations.filter(operation => operation.getSession.handle.equals(SessionHandle.fromUUID(sessionHandle))) } + if (StringUtils.isNotBlank(sessionType)) { + operations = operations.filter(operation => + sessionType.equalsIgnoreCase( + operation.getSession.asInstanceOf[KyuubiSession].sessionType.toString)) + } operations .map(operation => ApiUtils.operationData(operation.asInstanceOf[KyuubiOperation])).toSeq } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala index 95aa3de02..2360dea60 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala @@ -44,6 +44,7 @@ import org.apache.kyuubi.server.KyuubiRestFrontendService import org.apache.kyuubi.server.http.util.HttpAuthUtils import org.apache.kyuubi.server.http.util.HttpAuthUtils.AUTHORIZATION_HEADER import org.apache.kyuubi.service.authentication.AnonymousAuthenticationProviderImpl +import org.apache.kyuubi.session.SessionType import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2 class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { @@ -230,6 +231,42 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { operations = response.readEntity(classOf[Seq[OperationData]]) assert(response.getStatus === 200) assert(operations.size == 1) + + response = webTarget.path("api/v1/admin/sessions") + .queryParam("sessionType", SessionType.INTERACTIVE.toString) + .request() + .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser)) + .get() + sessions = response.readEntity(classOf[Seq[SessionData]]) + assert(response.getStatus === 200) + assert(sessions.size > 0) + + response = webTarget.path("api/v1/admin/sessions") + .queryParam("sessionType", SessionType.BATCH.toString) + .request() + .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser)) + .get() + sessions = response.readEntity(classOf[Seq[SessionData]]) + assert(response.getStatus === 200) + assert(sessions.size == 0) + + response = webTarget.path("api/v1/admin/operations") + .queryParam("sessionType", SessionType.INTERACTIVE.toString) + .request() + .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser)) + .get() + operations = response.readEntity(classOf[Seq[OperationData]]) + assert(response.getStatus === 200) + assert(operations.size > 0) + + response = webTarget.path("api/v1/admin/operations") + .queryParam("sessionType", SessionType.BATCH.toString) + .request() + .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser)) + .get() + operations = response.readEntity(classOf[Seq[OperationData]]) + assert(response.getStatus === 200) + assert(operations.size == 0) } test("list/close operations") {