diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala index a18576240..927fa9a34 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala @@ -107,6 +107,10 @@ abstract class SessionManager(name: String) extends CompositeService(name) { def getOpenSessionCount: Int = handleToSession.size() + def getSessionList(): ConcurrentHashMap[SessionHandle, Session] = { + handleToSession + } + def getExecPoolSize: Int = { assert(execPool != null) execPool.getPoolSize diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala index 173329ace..ab14c34ed 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala @@ -18,9 +18,11 @@ package org.apache.kyuubi.server.api.v1 import java.util.UUID -import javax.ws.rs.{Consumes, DELETE, GET, Path, PathParam, POST, Produces} +import javax.ws.rs._ import javax.ws.rs.core.{MediaType, Response} +import scala.collection.JavaConverters._ + import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.cli.HandleIdentifier @@ -30,6 +32,35 @@ import org.apache.kyuubi.session.SessionHandle @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class SessionsResource extends ApiRequestContext { + @GET + def sessionInfoList(): SessionList = { + SessionList( + backendService.sessionManager.getSessionList().asScala.map { + case (handle, session) => + SessionOverview(session.user, session.ipAddress, session.createTime, handle) + }.toList + ) + } + + @GET + @Path("{sessionHandle}") + def sessionInfo(@PathParam("sessionHandle") sessionHandleStr: String): SessionDetail = { + val splitSessionHandle = sessionHandleStr.split("\\|") + val handleIdentifier = new HandleIdentifier( + UUID.fromString(splitSessionHandle(0)), UUID.fromString(splitSessionHandle(1))) + val protocolVersion = TProtocolVersion.findByValue(splitSessionHandle(2).toInt) + val sessionHandle = new SessionHandle(handleIdentifier, protocolVersion) + + try { + val session = backendService.sessionManager.getSession(sessionHandle) + SessionDetail(session.user, session.ipAddress, session.createTime, sessionHandle, + session.lastAccessTime, session.lastIdleTime, session.getNoOperationTime, session.conf) + } catch { + case _: Throwable => + throw new NotFoundException() + } + } + @GET @Path("count") def sessionCount(): SessionOpenCount = { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala index 48131f322..7b8482197 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala @@ -17,10 +17,32 @@ package org.apache.kyuubi.server.api.v1 +import org.apache.kyuubi.session.SessionHandle + case class SessionOpenCount(openSessionCount: Int) case class ExecPoolStatistic(execPoolSize: Int, execPoolActiveCount: Int) +case class SessionList(sessionList: List[SessionOverview]) + +case class SessionOverview( + user: String, + ipAddr: String, + createTime: Long, + sessionHandle: SessionHandle +) + +case class SessionDetail( + user: String, + ipAddr: String, + createTime: Long, + sessionHandle: SessionHandle, + lastAccessTime: Long, + lastIdleTime: Long, + noOperationTime: Long, + configs: Map[String, String] +) + case class SessionOpenRequest( protocolVersion: Int, user: String, diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala index dc6008616..8ddb8b109 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala @@ -18,7 +18,7 @@ package org.apache.kyuubi.server.api.v1 import javax.ws.rs.client.Entity -import javax.ws.rs.core.MediaType +import javax.ws.rs.core.{MediaType, Response} import org.junit.Test @@ -111,4 +111,67 @@ class SessionsResourceSuite extends RestApiBaseSuite { } } + @Test + def testGetSessionList: Unit = { + val requestObj = SessionOpenRequest( + 1, "admin", "123456", "localhost", Map("testConfig" -> "testValue")) + + RestFrontendServiceSuite.withKyuubiRestServer { + (_, _, _) => + var response = target(s"api/v1/sessions") + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE)) + + // get session list + var response2 = target("api/v1/sessions").request().get() + assert(200 == response2.getStatus) + val sessions1 = response2.readEntity(classOf[SessionList]) + assert(sessions1.sessionList.nonEmpty) + + // close a opened session + val sessionHandle = response.readEntity(classOf[SessionHandle]) + val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" + + s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}" + response = target(s"api/v1/sessions/$serializedSessionHandle").request().delete() + assert(200 == response.getStatus) + + // get session list again + response2 = target("api/v1/sessions").request().get() + assert(200 == response2.getStatus) + val sessions2 = response2.readEntity(classOf[SessionList]) + assert(sessions2.sessionList.isEmpty) + } + } + + @Test + def testGetSessionDetail: Unit = { + val requestObj = SessionOpenRequest( + 1, "admin", "123456", "localhost", Map("testConfig" -> "testValue")) + + RestFrontendServiceSuite.withKyuubiRestServer { + (_, _, _) => + var response: Response = target(s"api/v1/sessions") + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE)) + + val sessionHandle = response.readEntity(classOf[SessionHandle]) + val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" + + s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}" + + // get session detail + response = target(s"api/v1/sessions/$serializedSessionHandle").request().get() + assert(200 == response.getStatus) + var sessions = response.readEntity(classOf[SessionDetail]) + assert(sessions.configs.nonEmpty) + + // close a opened session + response = target(s"api/v1/sessions/$serializedSessionHandle").request().delete() + assert(200 == response.getStatus) + + // get session detail again + response = target(s"api/v1/sessions/$serializedSessionHandle").request().get() + assert(404 == response.getStatus) + + } + } }