[KYUUBI #6049] Support to filter sessions/operations with session type

# 🔍 Description
Support to filter sessions/operations with session type.
## Issue References 🔗

This pull request fixes #

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6049 from turboFei/batch_interactive.

Closes #6049

68390c774 [Wang, Fei] add ut
bfc2cb8c2 [Fei Wang] save
c1979c7ea [Fei Wang] saev

Lead-authored-by: Fei Wang <fwang12@ebay.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Fei Wang 2024-04-21 18:18:19 -07:00 committed by Wang, Fei
parent 90491fc07e
commit d299da77b3
3 changed files with 62 additions and 6 deletions

View File

@ -89,14 +89,17 @@ public class AdminRestApi {
}
public List<SessionData> listSessions() {
return listSessions(Collections.emptyList());
return listSessions(Collections.emptyList(), null);
}
public List<SessionData> listSessions(List<String> users) {
public List<SessionData> listSessions(List<String> users, String sessionType) {
Map<String, Object> params = new HashMap<>();
if (users != null && !users.isEmpty()) {
params.put("users", String.join(",", users));
}
if (StringUtils.isNotBlank(sessionType)) {
params.put("sessionType", sessionType);
}
SessionData[] result =
this.getClient()
.get(API_BASE_PATH + "/sessions", params, SessionData[].class, client.getAuthHeader());
@ -109,10 +112,11 @@ public class AdminRestApi {
}
public List<OperationData> listOperations() {
return listOperations(Collections.emptyList(), null);
return listOperations(Collections.emptyList(), null, null);
}
public List<OperationData> listOperations(List<String> users, String sessionHandleStr) {
public List<OperationData> listOperations(
List<String> users, String sessionHandleStr, String sessionType) {
Map<String, Object> params = new HashMap<>();
if (users != null && !users.isEmpty()) {
params.put("users", String.join(",", users));
@ -120,6 +124,9 @@ public class AdminRestApi {
if (StringUtils.isNotBlank(sessionHandleStr)) {
params.put("sessionHandle", sessionHandleStr);
}
if (StringUtils.isNotBlank(sessionType)) {
params.put("sessionType", sessionType);
}
OperationData[] result =
this.getClient()
.get(

View File

@ -168,7 +168,9 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
description = "get the list of all live sessions")
@GET
@Path("sessions")
def sessions(@QueryParam("users") users: String): Seq[SessionData] = {
def sessions(
@QueryParam("users") users: String,
@QueryParam("sessionType") sessionType: String): Seq[SessionData] = {
val userName = fe.getSessionUser(Map.empty[String, String])
val ipAddress = fe.getIpAddress
info(s"Received listing all live sessions request from $userName/$ipAddress")
@ -177,6 +179,10 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
s"$userName is not allowed to list all live sessions")
}
var sessions = fe.be.sessionManager.allSessions()
if (StringUtils.isNoneBlank(sessionType)) {
sessions = sessions.filter(session =>
sessionType.equals(session.asInstanceOf[KyuubiSession].sessionType.toString))
}
if (StringUtils.isNotBlank(users)) {
val usersSet = users.split(",").toSet
sessions = sessions.filter(session => usersSet.contains(session.user))
@ -214,7 +220,8 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
@Path("operations")
def listOperations(
@QueryParam("users") users: String,
@QueryParam("sessionHandle") sessionHandle: String): Seq[OperationData] = {
@QueryParam("sessionHandle") sessionHandle: String,
@QueryParam("sessionType") sessionType: String): Seq[OperationData] = {
val userName = fe.getSessionUser(Map.empty[String, String])
val ipAddress = fe.getIpAddress
info(s"Received listing all of the active operations request from $userName/$ipAddress")
@ -231,6 +238,11 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
operations = operations.filter(operation =>
operation.getSession.handle.equals(SessionHandle.fromUUID(sessionHandle)))
}
if (StringUtils.isNotBlank(sessionType)) {
operations = operations.filter(operation =>
sessionType.equalsIgnoreCase(
operation.getSession.asInstanceOf[KyuubiSession].sessionType.toString))
}
operations
.map(operation => ApiUtils.operationData(operation.asInstanceOf[KyuubiOperation])).toSeq
}

View File

@ -44,6 +44,7 @@ import org.apache.kyuubi.server.KyuubiRestFrontendService
import org.apache.kyuubi.server.http.util.HttpAuthUtils
import org.apache.kyuubi.server.http.util.HttpAuthUtils.AUTHORIZATION_HEADER
import org.apache.kyuubi.service.authentication.AnonymousAuthenticationProviderImpl
import org.apache.kyuubi.session.SessionType
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2
class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
@ -230,6 +231,42 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
operations = response.readEntity(classOf[Seq[OperationData]])
assert(response.getStatus === 200)
assert(operations.size == 1)
response = webTarget.path("api/v1/admin/sessions")
.queryParam("sessionType", SessionType.INTERACTIVE.toString)
.request()
.header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
.get()
sessions = response.readEntity(classOf[Seq[SessionData]])
assert(response.getStatus === 200)
assert(sessions.size > 0)
response = webTarget.path("api/v1/admin/sessions")
.queryParam("sessionType", SessionType.BATCH.toString)
.request()
.header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
.get()
sessions = response.readEntity(classOf[Seq[SessionData]])
assert(response.getStatus === 200)
assert(sessions.size == 0)
response = webTarget.path("api/v1/admin/operations")
.queryParam("sessionType", SessionType.INTERACTIVE.toString)
.request()
.header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
.get()
operations = response.readEntity(classOf[Seq[OperationData]])
assert(response.getStatus === 200)
assert(operations.size > 0)
response = webTarget.path("api/v1/admin/operations")
.queryParam("sessionType", SessionType.BATCH.toString)
.request()
.header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
.get()
operations = response.readEntity(classOf[Seq[OperationData]])
assert(response.getStatus === 200)
assert(operations.size == 0)
}
test("list/close operations") {