[KYUUBI #1276] [KYUUBI #1275] Implement api: /${version}/sessions and /${version}/sessions/{sessionHandle}

### _Why are the changes needed?_
This is a subtask of umbrella issue #KPIP-1

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1276 from simon824/h2.

Closes #1276

Closes #1275

f82958d1 [simon] fix ut
3fa9e95f [simon] response 404
2b0158d7 [simon] fix ut
3ccdec0a [simon] fix ut
f538376d [simon] fix ut
af12ba15 [simon] sessiondetails
67a0a8b1 [simon] add sessionHandle
f6b5b543 [simon] fix codestyle
d6539d8b [simon] #1275
34cdb757 [simon] #1275
b7e47435 [simon] h2

Authored-by: simon <zhangshiming@cvte.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
simon 2021-10-26 15:45:02 +08:00 committed by ulysses-you
parent 046317c715
commit 6624cb93f6
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
4 changed files with 122 additions and 2 deletions

View File

@ -107,6 +107,10 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
def getOpenSessionCount: Int = handleToSession.size()
def getSessionList(): ConcurrentHashMap[SessionHandle, Session] = {
handleToSession
}
def getExecPoolSize: Int = {
assert(execPool != null)
execPool.getPoolSize

View File

@ -18,9 +18,11 @@
package org.apache.kyuubi.server.api.v1
import java.util.UUID
import javax.ws.rs.{Consumes, DELETE, GET, Path, PathParam, POST, Produces}
import javax.ws.rs._
import javax.ws.rs.core.{MediaType, Response}
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.cli.HandleIdentifier
@ -30,6 +32,35 @@ import org.apache.kyuubi.session.SessionHandle
@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class SessionsResource extends ApiRequestContext {
@GET
def sessionInfoList(): SessionList = {
SessionList(
backendService.sessionManager.getSessionList().asScala.map {
case (handle, session) =>
SessionOverview(session.user, session.ipAddress, session.createTime, handle)
}.toList
)
}
@GET
@Path("{sessionHandle}")
def sessionInfo(@PathParam("sessionHandle") sessionHandleStr: String): SessionDetail = {
val splitSessionHandle = sessionHandleStr.split("\\|")
val handleIdentifier = new HandleIdentifier(
UUID.fromString(splitSessionHandle(0)), UUID.fromString(splitSessionHandle(1)))
val protocolVersion = TProtocolVersion.findByValue(splitSessionHandle(2).toInt)
val sessionHandle = new SessionHandle(handleIdentifier, protocolVersion)
try {
val session = backendService.sessionManager.getSession(sessionHandle)
SessionDetail(session.user, session.ipAddress, session.createTime, sessionHandle,
session.lastAccessTime, session.lastIdleTime, session.getNoOperationTime, session.conf)
} catch {
case _: Throwable =>
throw new NotFoundException()
}
}
@GET
@Path("count")
def sessionCount(): SessionOpenCount = {

View File

@ -17,10 +17,32 @@
package org.apache.kyuubi.server.api.v1
import org.apache.kyuubi.session.SessionHandle
case class SessionOpenCount(openSessionCount: Int)
case class ExecPoolStatistic(execPoolSize: Int, execPoolActiveCount: Int)
case class SessionList(sessionList: List[SessionOverview])
case class SessionOverview(
user: String,
ipAddr: String,
createTime: Long,
sessionHandle: SessionHandle
)
case class SessionDetail(
user: String,
ipAddr: String,
createTime: Long,
sessionHandle: SessionHandle,
lastAccessTime: Long,
lastIdleTime: Long,
noOperationTime: Long,
configs: Map[String, String]
)
case class SessionOpenRequest(
protocolVersion: Int,
user: String,

View File

@ -18,7 +18,7 @@
package org.apache.kyuubi.server.api.v1
import javax.ws.rs.client.Entity
import javax.ws.rs.core.MediaType
import javax.ws.rs.core.{MediaType, Response}
import org.junit.Test
@ -111,4 +111,67 @@ class SessionsResourceSuite extends RestApiBaseSuite {
}
}
@Test
def testGetSessionList: Unit = {
val requestObj = SessionOpenRequest(
1, "admin", "123456", "localhost", Map("testConfig" -> "testValue"))
RestFrontendServiceSuite.withKyuubiRestServer {
(_, _, _) =>
var response = target(s"api/v1/sessions")
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
// get session list
var response2 = target("api/v1/sessions").request().get()
assert(200 == response2.getStatus)
val sessions1 = response2.readEntity(classOf[SessionList])
assert(sessions1.sessionList.nonEmpty)
// close a opened session
val sessionHandle = response.readEntity(classOf[SessionHandle])
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
response = target(s"api/v1/sessions/$serializedSessionHandle").request().delete()
assert(200 == response.getStatus)
// get session list again
response2 = target("api/v1/sessions").request().get()
assert(200 == response2.getStatus)
val sessions2 = response2.readEntity(classOf[SessionList])
assert(sessions2.sessionList.isEmpty)
}
}
@Test
def testGetSessionDetail: Unit = {
val requestObj = SessionOpenRequest(
1, "admin", "123456", "localhost", Map("testConfig" -> "testValue"))
RestFrontendServiceSuite.withKyuubiRestServer {
(_, _, _) =>
var response: Response = target(s"api/v1/sessions")
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
val sessionHandle = response.readEntity(classOf[SessionHandle])
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
// get session detail
response = target(s"api/v1/sessions/$serializedSessionHandle").request().get()
assert(200 == response.getStatus)
var sessions = response.readEntity(classOf[SessionDetail])
assert(sessions.configs.nonEmpty)
// close a opened session
response = target(s"api/v1/sessions/$serializedSessionHandle").request().delete()
assert(200 == response.getStatus)
// get session detail again
response = target(s"api/v1/sessions/$serializedSessionHandle").request().get()
assert(404 == response.getStatus)
}
}
}