From 6624cb93f66742b15566ecbdbc861c0d70e6e6c3 Mon Sep 17 00:00:00 2001 From: simon Date: Tue, 26 Oct 2021 15:45:02 +0800 Subject: [PATCH] [KYUUBI #1276] [KYUUBI #1275] Implement api: /${version}/sessions and /${version}/sessions/{sessionHandle} ### _Why are the changes needed?_ This is a subtask of umbrella issue #KPIP-1 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1276 from simon824/h2. Closes #1276 Closes #1275 f82958d1 [simon] fix ut 3fa9e95f [simon] response 404 2b0158d7 [simon] fix ut 3ccdec0a [simon] fix ut f538376d [simon] fix ut af12ba15 [simon] sessiondetails 67a0a8b1 [simon] add sessionHandle f6b5b543 [simon] fix codestyle d6539d8b [simon] #1275 34cdb757 [simon] #1275 b7e47435 [simon] h2 Authored-by: simon Signed-off-by: ulysses-you --- .../kyuubi/session/SessionManager.scala | 4 ++ .../server/api/v1/SessionsResource.scala | 33 +++++++++- .../org/apache/kyuubi/server/api/v1/dto.scala | 22 +++++++ .../server/api/v1/SessionsResourceSuite.scala | 65 ++++++++++++++++++- 4 files changed, 122 insertions(+), 2 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala index a18576240..927fa9a34 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala @@ -107,6 +107,10 @@ abstract class SessionManager(name: String) extends CompositeService(name) { def getOpenSessionCount: Int = handleToSession.size() + def getSessionList(): ConcurrentHashMap[SessionHandle, Session] = { + handleToSession + } + def getExecPoolSize: Int = { assert(execPool != null) execPool.getPoolSize diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala index 173329ace..ab14c34ed 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala @@ -18,9 +18,11 @@ package org.apache.kyuubi.server.api.v1 import java.util.UUID -import javax.ws.rs.{Consumes, DELETE, GET, Path, PathParam, POST, Produces} +import javax.ws.rs._ import javax.ws.rs.core.{MediaType, Response} +import scala.collection.JavaConverters._ + import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.cli.HandleIdentifier @@ -30,6 +32,35 @@ import org.apache.kyuubi.session.SessionHandle @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class SessionsResource extends ApiRequestContext { + @GET + def sessionInfoList(): SessionList = { + SessionList( + backendService.sessionManager.getSessionList().asScala.map { + case (handle, session) => + SessionOverview(session.user, session.ipAddress, session.createTime, handle) + }.toList + ) + } + + @GET + @Path("{sessionHandle}") + def sessionInfo(@PathParam("sessionHandle") sessionHandleStr: String): SessionDetail = { + val splitSessionHandle = sessionHandleStr.split("\\|") + val handleIdentifier = new HandleIdentifier( + UUID.fromString(splitSessionHandle(0)), UUID.fromString(splitSessionHandle(1))) + val protocolVersion = TProtocolVersion.findByValue(splitSessionHandle(2).toInt) + val sessionHandle = new SessionHandle(handleIdentifier, protocolVersion) + + try { + val session = backendService.sessionManager.getSession(sessionHandle) + SessionDetail(session.user, session.ipAddress, session.createTime, sessionHandle, + session.lastAccessTime, session.lastIdleTime, session.getNoOperationTime, session.conf) + } catch { + case _: Throwable => + throw new NotFoundException() + } + } + @GET @Path("count") def sessionCount(): SessionOpenCount = { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala index 48131f322..7b8482197 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala @@ -17,10 +17,32 @@ package org.apache.kyuubi.server.api.v1 +import org.apache.kyuubi.session.SessionHandle + case class SessionOpenCount(openSessionCount: Int) case class ExecPoolStatistic(execPoolSize: Int, execPoolActiveCount: Int) +case class SessionList(sessionList: List[SessionOverview]) + +case class SessionOverview( + user: String, + ipAddr: String, + createTime: Long, + sessionHandle: SessionHandle +) + +case class SessionDetail( + user: String, + ipAddr: String, + createTime: Long, + sessionHandle: SessionHandle, + lastAccessTime: Long, + lastIdleTime: Long, + noOperationTime: Long, + configs: Map[String, String] +) + case class SessionOpenRequest( protocolVersion: Int, user: String, diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala index dc6008616..8ddb8b109 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala @@ -18,7 +18,7 @@ package org.apache.kyuubi.server.api.v1 import javax.ws.rs.client.Entity -import javax.ws.rs.core.MediaType +import javax.ws.rs.core.{MediaType, Response} import org.junit.Test @@ -111,4 +111,67 @@ class SessionsResourceSuite extends RestApiBaseSuite { } } + @Test + def testGetSessionList: Unit = { + val requestObj = SessionOpenRequest( + 1, "admin", "123456", "localhost", Map("testConfig" -> "testValue")) + + RestFrontendServiceSuite.withKyuubiRestServer { + (_, _, _) => + var response = target(s"api/v1/sessions") + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE)) + + // get session list + var response2 = target("api/v1/sessions").request().get() + assert(200 == response2.getStatus) + val sessions1 = response2.readEntity(classOf[SessionList]) + assert(sessions1.sessionList.nonEmpty) + + // close a opened session + val sessionHandle = response.readEntity(classOf[SessionHandle]) + val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" + + s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}" + response = target(s"api/v1/sessions/$serializedSessionHandle").request().delete() + assert(200 == response.getStatus) + + // get session list again + response2 = target("api/v1/sessions").request().get() + assert(200 == response2.getStatus) + val sessions2 = response2.readEntity(classOf[SessionList]) + assert(sessions2.sessionList.isEmpty) + } + } + + @Test + def testGetSessionDetail: Unit = { + val requestObj = SessionOpenRequest( + 1, "admin", "123456", "localhost", Map("testConfig" -> "testValue")) + + RestFrontendServiceSuite.withKyuubiRestServer { + (_, _, _) => + var response: Response = target(s"api/v1/sessions") + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE)) + + val sessionHandle = response.readEntity(classOf[SessionHandle]) + val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" + + s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}" + + // get session detail + response = target(s"api/v1/sessions/$serializedSessionHandle").request().get() + assert(200 == response.getStatus) + var sessions = response.readEntity(classOf[SessionDetail]) + assert(sessions.configs.nonEmpty) + + // close a opened session + response = target(s"api/v1/sessions/$serializedSessionHandle").request().delete() + assert(200 == response.getStatus) + + // get session detail again + response = target(s"api/v1/sessions/$serializedSessionHandle").request().get() + assert(404 == response.getStatus) + + } + } }