[KYUUBI #1631] Migrating existing rest fe tests to real cases
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> In this PR, we target the existing UTs from a noop server to a real shared Kyuubi server. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1631 from yaooqinn/resttest. Closes #1631 2726ab44 [Kent Yao] address comments c0da8090 [Kent Yao] ci 5d12f70f [Kent Yao] address comments 80e14044 [Kent Yao] Migrating existing rest fe test to real cases a31ab506 [Kent Yao] Migrating existing rest fe test to real cases 0c1feb45 [Kent Yao] Migrating existing rest fe test to real cases fbd9769e [Kent Yao] Migrating existing rest fe test to real cases 75cca5ff [Kent Yao] Migrating existing rest fe test to real cases 119712e8 [Kent Yao] Migrating existing rest fe test to real cases 8196e4df [Kent Yao] Migrating existing rest fe test to real cases Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
2af105a417
commit
e1587eeaf4
@ -76,7 +76,7 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
|
||||
|
||||
override def connectionUrl: String = {
|
||||
checkInitialized()
|
||||
s"${serverAddr.getCanonicalHostName}:$portNum"
|
||||
s"${serverAddr.getCanonicalHostName}:${connector.getLocalPort}"
|
||||
}
|
||||
|
||||
override def start(): Unit = {
|
||||
|
||||
@ -211,13 +211,15 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@PathParam("sessionHandle") sessionHandleStr: String,
|
||||
request: GetSchemasRequest): OperationHandle = {
|
||||
try {
|
||||
backendService.getSchemas(
|
||||
parseSessionHandle(sessionHandleStr),
|
||||
val sessionHandle = parseSessionHandle(sessionHandleStr)
|
||||
val operationHandle = backendService.getSchemas(
|
||||
sessionHandle,
|
||||
request.catalogName,
|
||||
request.schemaName)
|
||||
operationHandle
|
||||
} catch {
|
||||
case NonFatal(_) =>
|
||||
throw new NotFoundException(s"Error getting schemas")
|
||||
case NonFatal(e) =>
|
||||
throw new NotFoundException(s"Error getting schemas", e)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.kyuubi
|
||||
|
||||
import java.net.URI
|
||||
import javax.ws.rs.client.WebTarget
|
||||
import javax.ws.rs.core.{Application, UriBuilder}
|
||||
|
||||
@ -28,9 +29,10 @@ import org.glassfish.jersey.test.spi.TestContainerFactory
|
||||
|
||||
import org.apache.kyuubi.RestFrontendTestHelper.RestApiBaseSuite
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.server.KyuubiRestFrontendService
|
||||
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols
|
||||
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol
|
||||
import org.apache.kyuubi.server.api.KyuubiScalaObjectMapper
|
||||
import org.apache.kyuubi.service.NoopRestFrontendServer
|
||||
import org.apache.kyuubi.service.AbstractFrontendService
|
||||
|
||||
object RestFrontendTestHelper {
|
||||
|
||||
@ -46,40 +48,28 @@ object RestFrontendTestHelper {
|
||||
}
|
||||
}
|
||||
|
||||
trait RestFrontendTestHelper {
|
||||
trait RestFrontendTestHelper extends WithKyuubiServer {
|
||||
|
||||
val restFrontendHost: String = "localhost"
|
||||
val restFrontendPort: Int = KyuubiConf().get(KyuubiConf.FRONTEND_REST_BIND_PORT)
|
||||
override protected val conf: KyuubiConf = KyuubiConf()
|
||||
|
||||
def withKyuubiRestServer(
|
||||
f: (KyuubiRestFrontendService, String, Int, WebTarget) => Unit): Unit = {
|
||||
override protected val frontendProtocols: Seq[FrontendProtocol] =
|
||||
FrontendProtocols.REST :: Nil
|
||||
|
||||
val server = new NoopRestFrontendServer()
|
||||
server.stop()
|
||||
val conf = KyuubiConf()
|
||||
conf.set(KyuubiConf.FRONTEND_REST_BIND_HOST, Some(restFrontendHost))
|
||||
private val restApiBaseSuite = new RestApiBaseSuite
|
||||
|
||||
server.initialize(conf)
|
||||
server.start()
|
||||
|
||||
val restApiBaseSuite = new RestApiBaseSuite
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
restApiBaseSuite.setUp()
|
||||
// noinspection HttpUrlsUsage
|
||||
val baseUri = UriBuilder
|
||||
.fromUri(s"http://$restFrontendHost/")
|
||||
.port(restFrontendPort)
|
||||
.build()
|
||||
val webTarget = restApiBaseSuite.client.target(baseUri)
|
||||
|
||||
try {
|
||||
f(
|
||||
server.frontendServices.head,
|
||||
conf.get(KyuubiConf.FRONTEND_REST_BIND_HOST).get,
|
||||
restFrontendPort,
|
||||
webTarget)
|
||||
} finally {
|
||||
restApiBaseSuite.tearDown()
|
||||
server.stop()
|
||||
}
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
restApiBaseSuite.tearDown()
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
protected lazy val fe: AbstractFrontendService = server.frontendServices.head
|
||||
|
||||
protected lazy val baseUri: URI = UriBuilder.fromUri(s"http://${fe.connectionUrl}/").build()
|
||||
|
||||
protected lazy val webTarget: WebTarget = restApiBaseSuite.client.target(baseUri)
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.kyuubi
|
||||
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf._
|
||||
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_AUTH_TYPE, HA_ZK_QUORUM}
|
||||
import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
|
||||
import org.apache.kyuubi.server.KyuubiServer
|
||||
@ -28,7 +29,7 @@ trait WithKyuubiServer extends KyuubiFunSuite {
|
||||
|
||||
protected val conf: KyuubiConf
|
||||
|
||||
protected val frontendProtocols: Seq[FrontendProtocols.Value] =
|
||||
protected val frontendProtocols: Seq[FrontendProtocol] =
|
||||
FrontendProtocols.THRIFT_BINARY :: Nil
|
||||
|
||||
private var zkServer: EmbeddedZookeeper = _
|
||||
|
||||
@ -17,80 +17,36 @@
|
||||
|
||||
package org.apache.kyuubi.server
|
||||
|
||||
import java.util.Locale
|
||||
import org.apache.kyuubi.RestFrontendTestHelper
|
||||
|
||||
import scala.io.Source
|
||||
|
||||
import org.scalatest.time.SpanSugar._
|
||||
|
||||
import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.service.NoopRestFrontendServer
|
||||
import org.apache.kyuubi.service.ServiceState._
|
||||
|
||||
class KyuubiRestFrontendServiceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
|
||||
|
||||
test("kyuubi REST frontend service basic") {
|
||||
val server = new NoopRestFrontendServer()
|
||||
server.stop()
|
||||
val conf = KyuubiConf()
|
||||
assert(server.getServices.isEmpty)
|
||||
assert(server.getServiceState === LATENT)
|
||||
val e = intercept[IllegalStateException](server.frontendServices.head.connectionUrl)
|
||||
assert(e.getMessage startsWith "Illegal Service State: LATENT")
|
||||
assert(server.getConf === null)
|
||||
|
||||
server.initialize(conf)
|
||||
assert(server.getServiceState === INITIALIZED)
|
||||
val frontendService = server.frontendServices.head
|
||||
assert(frontendService.getServiceState == INITIALIZED)
|
||||
assert(server.frontendServices.head.connectionUrl.split(":").length === 2)
|
||||
assert(server.getConf === conf)
|
||||
assert(server.getStartTime === 0)
|
||||
server.stop()
|
||||
|
||||
server.start()
|
||||
assert(server.getServiceState === STARTED)
|
||||
assert(frontendService.getServiceState == STARTED)
|
||||
assert(server.getStartTime !== 0)
|
||||
|
||||
server.stop()
|
||||
assert(server.getServiceState === STOPPED)
|
||||
assert(frontendService.getServiceState == STOPPED)
|
||||
server.stop()
|
||||
}
|
||||
class KyuubiRestFrontendServiceSuite extends RestFrontendTestHelper {
|
||||
|
||||
test("kyuubi REST frontend service http basic") {
|
||||
withKyuubiRestServer { (_, host, port, _) =>
|
||||
eventually(timeout(10.seconds), interval(50.milliseconds)) {
|
||||
val html = {
|
||||
// noinspection HttpUrlsUsage
|
||||
val s = Source.fromURL(s"http://$host:$port/api/v1/ping")
|
||||
val str = s.mkString
|
||||
s.close()
|
||||
str
|
||||
}
|
||||
assert(html.toLowerCase(Locale.ROOT).equals("pong"))
|
||||
}
|
||||
}
|
||||
val resp = webTarget.path("/api/v1/ping").request().get()
|
||||
assert(resp.readEntity(classOf[String]) === "pong")
|
||||
}
|
||||
|
||||
test("test error and exception response") {
|
||||
withKyuubiRestServer { (_, _, _, webTarget) =>
|
||||
// send a not exists request
|
||||
var response = webTarget.path("api/v1/pong").request().get()
|
||||
assert(404 == response.getStatus)
|
||||
assert(response.getStatusInfo.getReasonPhrase.equalsIgnoreCase("not found"))
|
||||
test("error and exception response") {
|
||||
var response = webTarget.path("api/v1/pong").request().get()
|
||||
assert(404 == response.getStatus)
|
||||
assert(response.getStatusInfo.getReasonPhrase.equalsIgnoreCase("not found"))
|
||||
|
||||
// send a exists request but wrong http method
|
||||
response = webTarget.path("api/v1/ping").request().post(null)
|
||||
assert(405 == response.getStatus)
|
||||
assert(response.getStatusInfo.getReasonPhrase.equalsIgnoreCase("method not allowed"))
|
||||
response = webTarget.path("api/v1/ping").request().post(null)
|
||||
assert(405 == response.getStatus)
|
||||
assert(response.getStatusInfo.getReasonPhrase.equalsIgnoreCase("method not allowed"))
|
||||
|
||||
// send a request but throws a exception on the server side
|
||||
response = webTarget.path("api/v1/exception").request().get()
|
||||
assert(500 == response.getStatus)
|
||||
assert(response.getStatusInfo.getReasonPhrase.equalsIgnoreCase("server error"))
|
||||
}
|
||||
response = webTarget.path("api/v1/exception").request().get()
|
||||
assert(500 == response.getStatus)
|
||||
assert(response.getStatusInfo.getReasonPhrase.equalsIgnoreCase("server error"))
|
||||
}
|
||||
|
||||
test("swagger ui") {
|
||||
val resp = webTarget.path("/api/v1/swagger-ui").request().get()
|
||||
assert(resp.getStatus === 200)
|
||||
}
|
||||
|
||||
test("swagger ui json data") {
|
||||
val resp = webTarget.path("/openapi.json").request().get()
|
||||
assert(resp.getStatus === 200)
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,116 +17,103 @@
|
||||
|
||||
package org.apache.kyuubi.server.api.v1
|
||||
|
||||
import javax.ws.rs.client.{Entity, WebTarget}
|
||||
import javax.ws.rs.client.Entity
|
||||
import javax.ws.rs.core.MediaType
|
||||
|
||||
import org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2
|
||||
import org.scalatest.concurrent.PatienceConfiguration.Timeout
|
||||
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
|
||||
|
||||
import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
|
||||
import org.apache.kyuubi.events.KyuubiOperationEvent
|
||||
import org.apache.kyuubi.operation.{ExecuteStatement, GetCatalogs, OperationState, OperationType}
|
||||
import org.apache.kyuubi.operation.{OperationState, OperationType}
|
||||
import org.apache.kyuubi.operation.OperationState.{FINISHED, OperationState}
|
||||
import org.apache.kyuubi.operation.OperationType.OperationType
|
||||
import org.apache.kyuubi.server.KyuubiRestFrontendService
|
||||
|
||||
class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
|
||||
|
||||
test("test get an operation event") {
|
||||
withKyuubiRestServer { (fe, _, _, webTarget) =>
|
||||
val catalogsHandleStr = getOpHandleStr(fe, OperationType.GET_CATALOGS)
|
||||
var response = webTarget.path(s"api/v1/operations/$catalogsHandleStr/event")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE).get()
|
||||
val operationEvent = response.readEntity(classOf[KyuubiOperationEvent])
|
||||
assert(200 == response.getStatus)
|
||||
assert(operationEvent.state == OperationState.INITIALIZED.name())
|
||||
test("get an operation event") {
|
||||
val catalogsHandleStr = getOpHandleStr(OperationType.GET_CATALOGS)
|
||||
checkOpState(catalogsHandleStr, FINISHED)
|
||||
|
||||
val statementHandleStr = getOpHandleStr(fe, OperationType.EXECUTE_STATEMENT)
|
||||
response = webTarget.path(s"api/v1/operations/$statementHandleStr/event")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE).get()
|
||||
val statementEvent = response.readEntity(classOf[KyuubiOperationEvent])
|
||||
assert(200 == response.getStatus)
|
||||
assert(statementEvent.state == OperationState.INITIALIZED.name())
|
||||
val statementHandleStr = getOpHandleStr(OperationType.EXECUTE_STATEMENT)
|
||||
checkOpState(statementHandleStr, FINISHED)
|
||||
|
||||
// Invalid operationHandleStr
|
||||
val invalidOperationHandle =
|
||||
statementHandleStr.replaceAll("EXECUTE_STATEMENT", "GET_TYPE_INFO")
|
||||
response = webTarget.path(s"api/v1/operations/$invalidOperationHandle/event")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE).get()
|
||||
assert(404 == response.getStatus)
|
||||
}
|
||||
// Invalid operationHandleStr
|
||||
val invalidOperationHandle =
|
||||
statementHandleStr.replaceAll("EXECUTE_STATEMENT", "GET_TYPE_INFO")
|
||||
val response = webTarget.path(s"api/v1/operations/$invalidOperationHandle/event")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE).get()
|
||||
assert(404 == response.getStatus)
|
||||
}
|
||||
|
||||
test("test apply an action for an operation") {
|
||||
withKyuubiRestServer { (fe, _, _, webTarget: WebTarget) =>
|
||||
val opHandleStr = getOpHandleStr(fe, OperationType.EXECUTE_STATEMENT)
|
||||
test("apply an action for an operation") {
|
||||
val opHandleStr = getOpHandleStr(OperationType.EXECUTE_STATEMENT)
|
||||
|
||||
var response = webTarget.path(s"api/v1/operations/$opHandleStr")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.put(Entity.entity(OpActionRequest("cancel"), MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
var response = webTarget.path(s"api/v1/operations/$opHandleStr")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.put(Entity.entity(OpActionRequest("cancel"), MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
checkOpState(opHandleStr, OperationState.CANCELED)
|
||||
|
||||
response = webTarget.path(s"api/v1/operations/$opHandleStr/event")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE).get()
|
||||
val operationEvent = response.readEntity(classOf[KyuubiOperationEvent])
|
||||
assert(operationEvent.state == OperationState.FINISHED.name() ||
|
||||
operationEvent.state == OperationState.CANCELED.name())
|
||||
|
||||
response = webTarget.path(s"api/v1/operations/$opHandleStr")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.put(Entity.entity(OpActionRequest("close"), MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
|
||||
response = webTarget.path(s"api/v1/operations/$opHandleStr/event")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE).get()
|
||||
assert(404 == response.getStatus)
|
||||
}
|
||||
response = webTarget.path(s"api/v1/operations/$opHandleStr")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.put(Entity.entity(OpActionRequest("close"), MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
response = webTarget.path(s"api/v1/operations/$opHandleStr/event")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE).get()
|
||||
assert(404 == response.getStatus)
|
||||
}
|
||||
|
||||
test("test get result set metadata") {
|
||||
withKyuubiRestServer { (fe, _, _, webTarget: WebTarget) =>
|
||||
val opHandleStr = getOpHandleStr(fe, OperationType.EXECUTE_STATEMENT)
|
||||
val response = webTarget.path(s"api/v1/operations/$opHandleStr/resultsetmetadata")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE).get()
|
||||
assert(200 == response.getStatus)
|
||||
val resultSetMetaData = response.readEntity(classOf[ResultSetMetaData])
|
||||
assert(resultSetMetaData.columns.head.columnName.equals("Result"))
|
||||
}
|
||||
test("get result set metadata") {
|
||||
val opHandleStr = getOpHandleStr(OperationType.EXECUTE_STATEMENT)
|
||||
checkOpState(opHandleStr, FINISHED)
|
||||
val response = webTarget.path(s"api/v1/operations/$opHandleStr/resultsetmetadata")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE).get()
|
||||
assert(200 == response.getStatus)
|
||||
val resultSetMetaData = response.readEntity(classOf[ResultSetMetaData])
|
||||
assert(resultSetMetaData.columns(1).columnName.equals("tableName"))
|
||||
}
|
||||
|
||||
test("test get operation log") {
|
||||
withKyuubiRestServer { (fe, _, _, webTarget: WebTarget) =>
|
||||
val opHandleStr = getOpHandleStr(fe, OperationType.EXECUTE_STATEMENT)
|
||||
val response = webTarget.path(
|
||||
s"api/v1/operations/$opHandleStr/log")
|
||||
.queryParam("maxrows", "10")
|
||||
.request(MediaType.APPLICATION_JSON).get()
|
||||
assert(200 == response.getStatus)
|
||||
val logRowSet = response.readEntity(classOf[OperationLog])
|
||||
assert(logRowSet.logRowSet.head.equals("test"))
|
||||
assert(logRowSet.rowCount == 1)
|
||||
}
|
||||
test("get operation log") {
|
||||
val opHandleStr = getOpHandleStr(OperationType.EXECUTE_STATEMENT)
|
||||
checkOpState(opHandleStr, FINISHED)
|
||||
val response = webTarget.path(
|
||||
s"api/v1/operations/$opHandleStr/log")
|
||||
.queryParam("maxrows", "10")
|
||||
.request(MediaType.APPLICATION_JSON).get()
|
||||
assert(200 == response.getStatus)
|
||||
val logRowSet = response.readEntity(classOf[OperationLog])
|
||||
assert(logRowSet.logRowSet.exists(_.contains("show tables")))
|
||||
assert(logRowSet.rowCount === 10)
|
||||
}
|
||||
|
||||
def getOpHandleStr(fe: KyuubiRestFrontendService, typ: OperationType): String = {
|
||||
val sessionManager = fe.be.sessionManager
|
||||
val sessionHandle = sessionManager.openSession(
|
||||
def getOpHandleStr(typ: OperationType): String = {
|
||||
val sessionHandle = fe.be.openSession(
|
||||
HIVE_CLI_SERVICE_PROTOCOL_V2,
|
||||
"admin",
|
||||
"123456",
|
||||
"localhost",
|
||||
Map("testConfig" -> "testValue"))
|
||||
val session = sessionManager.getSession(sessionHandle)
|
||||
|
||||
val op = typ match {
|
||||
case OperationType.EXECUTE_STATEMENT =>
|
||||
new ExecuteStatement(session, "show tables", true, 3000)
|
||||
case OperationType.GET_CATALOGS =>
|
||||
new GetCatalogs(session)
|
||||
fe.be.executeStatement(sessionHandle, "show tables", runAsync = true, 3000)
|
||||
case OperationType.GET_CATALOGS => fe.be.getCatalogs(sessionHandle)
|
||||
}
|
||||
sessionManager.operationManager.addOperation(op)
|
||||
val operationHandle = op.getHandle
|
||||
|
||||
s"${operationHandle.identifier.publicId}|" +
|
||||
s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
|
||||
s"${operationHandle.typ.toString}"
|
||||
s"${op.identifier.publicId}|" +
|
||||
s"${op.identifier.secretId}|${op.protocol.getValue}|" +
|
||||
s"${op.typ.toString}"
|
||||
}
|
||||
|
||||
private def checkOpState(opHandleStr: String, state: OperationState): Unit = {
|
||||
eventually(Timeout(5.seconds)) {
|
||||
val response = webTarget.path(s"api/v1/operations/$opHandleStr/event")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE).get()
|
||||
assert(response.getStatus === 200)
|
||||
val operationEvent = response.readEntity(classOf[KyuubiOperationEvent])
|
||||
assert(operationEvent.state === state.name())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,8 +21,6 @@ import java.util
|
||||
import javax.ws.rs.client.Entity
|
||||
import javax.ws.rs.core.{MediaType, Response}
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2
|
||||
|
||||
import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
|
||||
@ -34,7 +32,7 @@ import org.apache.kyuubi.session.SessionHandle
|
||||
|
||||
class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
|
||||
|
||||
test("test open and count session") {
|
||||
test("open/close and count session") {
|
||||
val requestObj = SessionOpenRequest(
|
||||
1,
|
||||
"admin",
|
||||
@ -42,26 +40,37 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
|
||||
"localhost",
|
||||
Map("testConfig" -> "testValue"))
|
||||
|
||||
withKyuubiRestServer { (_, _, _, webTarget) =>
|
||||
var response = webTarget.path("api/v1/sessions")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
|
||||
var response = webTarget.path("api/v1/sessions")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
|
||||
|
||||
assert(200 == response.getStatus)
|
||||
assert(200 == response.getStatus)
|
||||
|
||||
val sessionHandle = response.readEntity(classOf[SessionHandle])
|
||||
val sessionHandle = response.readEntity(classOf[SessionHandle])
|
||||
|
||||
assert(sessionHandle.protocol.getValue == 1)
|
||||
assert(sessionHandle.identifier != null)
|
||||
assert(sessionHandle.protocol.getValue == 1)
|
||||
assert(sessionHandle.identifier != null)
|
||||
|
||||
// verify the open session count
|
||||
response = webTarget.path("api/v1/sessions/count").request().get()
|
||||
val openedSessionCount = response.readEntity(classOf[SessionOpenCount])
|
||||
assert(openedSessionCount.openSessionCount == 1)
|
||||
}
|
||||
val statistic = webTarget.path("api/v1/sessions/execPool/statistic").request().get()
|
||||
val execPoolStatistic1 = statistic.readEntity(classOf[ExecPoolStatistic])
|
||||
assert(execPoolStatistic1.execPoolSize == 1 && execPoolStatistic1.execPoolActiveCount == 1)
|
||||
|
||||
response = webTarget.path("api/v1/sessions/count").request().get()
|
||||
val openedSessionCount = response.readEntity(classOf[SessionOpenCount])
|
||||
assert(openedSessionCount.openSessionCount == 1)
|
||||
|
||||
// close an opened session
|
||||
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
|
||||
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
|
||||
response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle").request().delete()
|
||||
assert(200 == response.getStatus)
|
||||
|
||||
response = webTarget.path("api/v1/sessions/count").request().get()
|
||||
val openedSessionCount2 = response.readEntity(classOf[SessionOpenCount])
|
||||
assert(openedSessionCount2.openSessionCount == 0)
|
||||
}
|
||||
|
||||
test("test close and count session") {
|
||||
test("getSessionList") {
|
||||
val requestObj = SessionOpenRequest(
|
||||
1,
|
||||
"admin",
|
||||
@ -69,56 +78,60 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
|
||||
"localhost",
|
||||
Map("testConfig" -> "testValue"))
|
||||
|
||||
withKyuubiRestServer { (_, _, _, webTarget) =>
|
||||
var response = webTarget.path("api/v1/sessions")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
|
||||
var response = webTarget.path("api/v1/sessions")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
|
||||
|
||||
assert(200 == response.getStatus)
|
||||
// get session list
|
||||
var response2 = webTarget.path("api/v1/sessions").request().get()
|
||||
assert(200 == response2.getStatus)
|
||||
val sessions1 = response2.readEntity(classOf[SessionList])
|
||||
assert(sessions1.sessionList.nonEmpty)
|
||||
|
||||
val sessionHandle = response.readEntity(classOf[SessionHandle])
|
||||
// close an opened session
|
||||
val sessionHandle = response.readEntity(classOf[SessionHandle])
|
||||
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
|
||||
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
|
||||
response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle").request().delete()
|
||||
assert(200 == response.getStatus)
|
||||
|
||||
assert(sessionHandle.protocol.getValue == 1)
|
||||
assert(sessionHandle.identifier != null)
|
||||
|
||||
// close a opened session
|
||||
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
|
||||
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
|
||||
response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle").request().delete()
|
||||
assert(200 == response.getStatus)
|
||||
|
||||
// verify the open session count again
|
||||
response = webTarget.path("api/v1/sessions/count").request().get()
|
||||
val openedSessionCount = response.readEntity(classOf[SessionOpenCount])
|
||||
assert(openedSessionCount.openSessionCount == 0)
|
||||
}
|
||||
// get session list again
|
||||
response2 = webTarget.path("api/v1/sessions").request().get()
|
||||
assert(200 == response2.getStatus)
|
||||
val sessions2 = response2.readEntity(classOf[SessionList])
|
||||
assert(sessions2.sessionList.isEmpty)
|
||||
}
|
||||
|
||||
test("test execPoolStatistic") {
|
||||
withKyuubiRestServer { (restFe, _, _, webTarget) =>
|
||||
val sessionManager = restFe.be.sessionManager
|
||||
val future = sessionManager.submitBackgroundOperation(() => Thread.sleep(1000))
|
||||
test("get session event") {
|
||||
val sessionManager = fe.be.sessionManager
|
||||
val sessionHandle = sessionManager.openSession(
|
||||
HIVE_CLI_SERVICE_PROTOCOL_V2,
|
||||
"admin",
|
||||
"123456",
|
||||
"localhost",
|
||||
Map("testConfig" -> "testValue"))
|
||||
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
|
||||
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
|
||||
|
||||
// verify the exec pool statistic
|
||||
var response = webTarget.path("api/v1/sessions/execPool/statistic").request().get()
|
||||
val execPoolStatistic1 = response.readEntity(classOf[ExecPoolStatistic])
|
||||
assert(execPoolStatistic1.execPoolSize == 1 && execPoolStatistic1.execPoolActiveCount == 1)
|
||||
KyuubiServer.kyuubiServer = new KyuubiServer
|
||||
KyuubiServer.kyuubiServer.initialize(KyuubiConf())
|
||||
|
||||
future.cancel(true)
|
||||
eventually(timeout(3.seconds), interval(200.milliseconds)) {
|
||||
response = webTarget.path("api/v1/sessions/execPool/statistic").request().get()
|
||||
val statistic = response.readEntity(classOf[ExecPoolStatistic])
|
||||
assert(statistic.execPoolSize == 1 && statistic.execPoolActiveCount == 0)
|
||||
}
|
||||
// get session event
|
||||
var response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle").request().get()
|
||||
assert(200 == response.getStatus)
|
||||
val sessions = response.readEntity(classOf[KyuubiSessionEvent])
|
||||
assert(sessions.conf("testConfig").equals("testValue"))
|
||||
|
||||
sessionManager.stop()
|
||||
response = webTarget.path("api/v1/sessions/execPool/statistic").request().get()
|
||||
val execPoolStatistic3 = response.readEntity(classOf[ExecPoolStatistic])
|
||||
assert(execPoolStatistic3.execPoolSize == 0 && execPoolStatistic3.execPoolActiveCount == 0)
|
||||
}
|
||||
// close an opened session
|
||||
response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle").request().delete()
|
||||
assert(200 == response.getStatus)
|
||||
|
||||
// get session detail again
|
||||
response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle").request().get()
|
||||
assert(404 == response.getStatus)
|
||||
}
|
||||
|
||||
test("test getSessionList") {
|
||||
test("get infoType") {
|
||||
val requestObj = SessionOpenRequest(
|
||||
1,
|
||||
"admin",
|
||||
@ -126,64 +139,37 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
|
||||
"localhost",
|
||||
Map("testConfig" -> "testValue"))
|
||||
|
||||
withKyuubiRestServer { (_, _, _, webTarget) =>
|
||||
var response = webTarget.path("api/v1/sessions")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
|
||||
var response: Response = webTarget.path("api/v1/sessions")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
|
||||
|
||||
// get session list
|
||||
var response2 = webTarget.path("api/v1/sessions").request().get()
|
||||
assert(200 == response2.getStatus)
|
||||
val sessions1 = response2.readEntity(classOf[SessionList])
|
||||
assert(sessions1.sessionList.nonEmpty)
|
||||
val sessionHandle = response.readEntity(classOf[SessionHandle])
|
||||
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
|
||||
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
|
||||
|
||||
// close a opened session
|
||||
val sessionHandle = response.readEntity(classOf[SessionHandle])
|
||||
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
|
||||
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
|
||||
response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle").request().delete()
|
||||
assert(200 == response.getStatus)
|
||||
response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle/info/13")
|
||||
.request().get()
|
||||
assert(200 == response.getStatus)
|
||||
val sessions = response.readEntity(classOf[InfoDetail])
|
||||
assert(sessions.infoType.equals("CLI_SERVER_NAME") &&
|
||||
sessions.infoValue.equals("Apache Kyuubi (Incubating)"))
|
||||
// Invalid sessionHandleStr
|
||||
val handle = "b88d6b56-d200-4bb6-bf0a-5da0ea572e11|0c4aad4e-ccf7-4abd-9305-943d4bfd2d9a|0"
|
||||
response = webTarget.path(s"api/v1/sessions/$handle/info/13").request().get()
|
||||
assert(404 == response.getStatus)
|
||||
response = webTarget.path(s"api/v1/sessions/0/info/13").request().get()
|
||||
assert(404 == response.getStatus)
|
||||
|
||||
// get session list again
|
||||
response2 = webTarget.path("api/v1/sessions").request().get()
|
||||
assert(200 == response2.getStatus)
|
||||
val sessions2 = response2.readEntity(classOf[SessionList])
|
||||
assert(sessions2.sessionList.isEmpty)
|
||||
}
|
||||
// Invalid infoType
|
||||
response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle/info/0")
|
||||
.request().get()
|
||||
assert(404 == response.getStatus)
|
||||
response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle/info/str")
|
||||
.request().get()
|
||||
assert(404 == response.getStatus)
|
||||
}
|
||||
|
||||
test("test get session event") {
|
||||
withKyuubiRestServer { (fe, _, _, webTarget) =>
|
||||
val sessionManager = fe.be.sessionManager
|
||||
val sessionHandle = sessionManager.openSession(
|
||||
HIVE_CLI_SERVICE_PROTOCOL_V2,
|
||||
"admin",
|
||||
"123456",
|
||||
"localhost",
|
||||
Map("testConfig" -> "testValue"))
|
||||
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
|
||||
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
|
||||
|
||||
KyuubiServer.kyuubiServer = new KyuubiServer
|
||||
KyuubiServer.kyuubiServer.initialize(KyuubiConf())
|
||||
|
||||
// get session event
|
||||
var response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle").request().get()
|
||||
assert(200 == response.getStatus)
|
||||
val sessions = response.readEntity(classOf[KyuubiSessionEvent])
|
||||
assert(sessions.conf("testConfig").equals("testValue"))
|
||||
|
||||
// close a opened session
|
||||
response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle").request().delete()
|
||||
assert(200 == response.getStatus)
|
||||
|
||||
// get session detail again
|
||||
response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle").request().get()
|
||||
assert(404 == response.getStatus)
|
||||
}
|
||||
}
|
||||
|
||||
test("test get infoType") {
|
||||
test("submit operation and get operation handle") {
|
||||
val requestObj = SessionOpenRequest(
|
||||
1,
|
||||
"admin",
|
||||
@ -191,39 +177,78 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
|
||||
"localhost",
|
||||
Map("testConfig" -> "testValue"))
|
||||
|
||||
withKyuubiRestServer { (_, _, _, webTarget) =>
|
||||
var response: Response = webTarget.path("api/v1/sessions")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
|
||||
var response: Response = webTarget.path("api/v1/sessions")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
|
||||
|
||||
val sessionHandle = response.readEntity(classOf[SessionHandle])
|
||||
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
|
||||
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
|
||||
val sessionHandle = response.readEntity(classOf[SessionHandle])
|
||||
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
|
||||
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
|
||||
|
||||
response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle/info/13")
|
||||
.request().get()
|
||||
assert(200 == response.getStatus)
|
||||
val sessions = response.readEntity(classOf[InfoDetail])
|
||||
assert(sessions.infoType.equals("CLI_SERVER_NAME") &&
|
||||
sessions.infoValue.equals("Apache Kyuubi (Incubating)"))
|
||||
// Invalid sessionHandleStr
|
||||
val handle = "b88d6b56-d200-4bb6-bf0a-5da0ea572e11|0c4aad4e-ccf7-4abd-9305-943d4bfd2d9a|0"
|
||||
response = webTarget.path(s"api/v1/sessions/$handle/info/13").request().get()
|
||||
assert(404 == response.getStatus)
|
||||
response = webTarget.path(s"api/v1/sessions/0/info/13").request().get()
|
||||
assert(404 == response.getStatus)
|
||||
val pathPrefix = s"api/v1/sessions/$serializedSessionHandle"
|
||||
|
||||
// Invalid infoType
|
||||
response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle/info/0")
|
||||
.request().get()
|
||||
assert(404 == response.getStatus)
|
||||
response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle/info/str")
|
||||
.request().get()
|
||||
assert(404 == response.getStatus)
|
||||
}
|
||||
val statementReq = StatementRequest("show tables", true, 3000)
|
||||
response = webTarget
|
||||
.path(s"$pathPrefix/operations/statement").request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(statementReq, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
var operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.EXECUTE_STATEMENT)
|
||||
|
||||
response = webTarget.path(s"$pathPrefix/operations/typeInfo").request()
|
||||
.post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_TYPE_INFO)
|
||||
|
||||
response = webTarget.path(s"$pathPrefix/operations/catalogs")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_CATALOGS)
|
||||
|
||||
val getSchemasReq = GetSchemasRequest("spark_catalog", "default")
|
||||
response = webTarget.path(s"$pathPrefix/operations/schemas")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(getSchemasReq, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_SCHEMAS)
|
||||
|
||||
val tableTypes = new util.ArrayList[String]()
|
||||
val getTablesReq = GetTablesRequest("spark_catalog", "default", "default", tableTypes)
|
||||
response = webTarget.path(s"$pathPrefix/operations/tables")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(getTablesReq, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_TABLES)
|
||||
|
||||
response = webTarget.path(s"$pathPrefix/operations/tableTypes").request()
|
||||
.post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_TABLE_TYPES)
|
||||
|
||||
val getColumnsReq = GetColumnsRequest("spark_catalog", "default", "default", "default")
|
||||
response = webTarget.path(s"$pathPrefix/operations/columns")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(getColumnsReq, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_COLUMNS)
|
||||
|
||||
val getFunctionsReq = GetFunctionsRequest("default", "default", "default")
|
||||
response = webTarget.path(s"$pathPrefix/operations/functions")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(getFunctionsReq, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_FUNCTIONS)
|
||||
}
|
||||
|
||||
test("test submit operation and get operation handle") {
|
||||
test("close an operation") {
|
||||
val requestObj = SessionOpenRequest(
|
||||
1,
|
||||
"admin",
|
||||
@ -231,118 +256,34 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
|
||||
"localhost",
|
||||
Map("testConfig" -> "testValue"))
|
||||
|
||||
withKyuubiRestServer { (_, _, _, webTarget) =>
|
||||
var response: Response = webTarget.path("api/v1/sessions")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
|
||||
var response: Response = webTarget.path("api/v1/sessions")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
|
||||
|
||||
val sessionHandle = response.readEntity(classOf[SessionHandle])
|
||||
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
|
||||
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
|
||||
val sessionHandle = response.readEntity(classOf[SessionHandle])
|
||||
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
|
||||
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
|
||||
|
||||
val pathPrefix = s"api/v1/sessions/$serializedSessionHandle"
|
||||
val pathPrefix = s"api/v1/sessions/$serializedSessionHandle"
|
||||
|
||||
val statementReq = StatementRequest("show tables", true, 3000)
|
||||
response = webTarget
|
||||
.path(s"$pathPrefix/operations/statement").request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(statementReq, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
var operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.EXECUTE_STATEMENT)
|
||||
response = webTarget.path(s"$pathPrefix/operations/catalogs")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
val operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_CATALOGS)
|
||||
|
||||
response = webTarget.path(s"$pathPrefix/operations/typeInfo").request()
|
||||
.post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_TYPE_INFO)
|
||||
val serializedOperationHandle = s"${operationHandle.identifier.publicId}|" +
|
||||
s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
|
||||
s"${operationHandle.typ.toString}"
|
||||
|
||||
response = webTarget.path(s"$pathPrefix/operations/catalogs")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_CATALOGS)
|
||||
response = webTarget.path(s"$pathPrefix/operations/$serializedOperationHandle")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE).delete()
|
||||
assert(200 == response.getStatus)
|
||||
|
||||
val getSchemasReq = GetSchemasRequest("default", "default")
|
||||
response = webTarget.path(s"$pathPrefix/operations/schemas")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(getSchemasReq, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_SCHEMAS)
|
||||
|
||||
val tableTypes = new util.ArrayList[String]()
|
||||
val getTablesReq = GetTablesRequest("default", "default", "default", tableTypes)
|
||||
response = webTarget.path(s"$pathPrefix/operations/tables")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(getTablesReq, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_TABLES)
|
||||
|
||||
response = webTarget.path(s"$pathPrefix/operations/tableTypes").request()
|
||||
.post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_TABLE_TYPES)
|
||||
|
||||
val getColumnsReq = GetColumnsRequest("default", "default", "default", "default")
|
||||
response = webTarget.path(s"$pathPrefix/operations/columns")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(getColumnsReq, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_COLUMNS)
|
||||
|
||||
var getFunctionsReq = GetFunctionsRequest("default", "default", "default")
|
||||
response = webTarget.path(s"$pathPrefix/operations/functions")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(getFunctionsReq, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_FUNCTIONS)
|
||||
}
|
||||
}
|
||||
|
||||
test("test close an operation") {
|
||||
val requestObj = SessionOpenRequest(
|
||||
1,
|
||||
"admin",
|
||||
"123456",
|
||||
"localhost",
|
||||
Map("testConfig" -> "testValue"))
|
||||
|
||||
withKyuubiRestServer { (_, _, _, webTarget) =>
|
||||
var response: Response = webTarget.path("api/v1/sessions")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
|
||||
|
||||
val sessionHandle = response.readEntity(classOf[SessionHandle])
|
||||
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
|
||||
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
|
||||
|
||||
val pathPrefix = s"api/v1/sessions/$serializedSessionHandle"
|
||||
|
||||
response = webTarget.path(s"$pathPrefix/operations/catalogs")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
|
||||
assert(200 == response.getStatus)
|
||||
var operationHandle = response.readEntity(classOf[OperationHandle])
|
||||
assert(operationHandle.typ == OperationType.GET_CATALOGS)
|
||||
|
||||
val serializedOperationHandle = s"${operationHandle.identifier.publicId}|" +
|
||||
s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
|
||||
s"${operationHandle.typ.toString}"
|
||||
|
||||
response = webTarget.path(s"$pathPrefix/operations/$serializedOperationHandle")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE).delete()
|
||||
assert(200 == response.getStatus)
|
||||
|
||||
// verify operation
|
||||
response = webTarget.path(s"api/v1/operations/$serializedOperationHandle/event")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE).get()
|
||||
assert(404 == response.getStatus)
|
||||
|
||||
}
|
||||
// verify operation
|
||||
response = webTarget.path(s"api/v1/operations/$serializedOperationHandle/event")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE).get()
|
||||
assert(404 == response.getStatus)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,25 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.service
|
||||
|
||||
import org.apache.kyuubi.server.KyuubiRestFrontendService
|
||||
|
||||
class NoopRestFrontendServer extends AbstractNoopServer("NoopRestFrontendServer") {
|
||||
|
||||
override val frontendServices = Seq(new KyuubiRestFrontendService(this))
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user