FrontendServiceSuite get catalogs
This commit is contained in:
parent
6efa589a74
commit
64cfb8cd95
@ -20,10 +20,11 @@ package org.apache.kyuubi.operation
|
||||
import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
|
||||
|
||||
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
|
||||
import org.apache.kyuubi.operation.OperationType.OperationType
|
||||
import org.apache.kyuubi.session.Session
|
||||
|
||||
class NoopOperation(session: Session)
|
||||
extends AbstractOperation(OperationType.UNKNOWN_OPERATION, session) {
|
||||
class NoopOperation(typ: OperationType, session: Session)
|
||||
extends AbstractOperation(typ, session) {
|
||||
override protected def runInternal(): Unit = {}
|
||||
|
||||
override protected def beforeRun(): Unit = {}
|
||||
|
||||
@ -29,17 +29,17 @@ class NoopOperationManager extends OperationManager("noop") {
|
||||
runAsync: Boolean,
|
||||
queryTimeout: Long): Operation = {
|
||||
|
||||
val operation = new NoopOperation(session)
|
||||
val operation = new NoopOperation(OperationType.EXECUTE_STATEMENT, session)
|
||||
addOperation(operation)
|
||||
}
|
||||
|
||||
override def newGetTypeInfoOperation(session: Session): Operation = {
|
||||
val operation = new NoopOperation(session)
|
||||
val operation = new NoopOperation(OperationType.GET_TYPE_INFO, session)
|
||||
addOperation(operation)
|
||||
}
|
||||
|
||||
override def newGetCatalogsOperation(session: Session): Operation = {
|
||||
val operation = new NoopOperation(session)
|
||||
val operation = new NoopOperation(OperationType.GET_CATALOGS, session)
|
||||
addOperation(operation)
|
||||
}
|
||||
|
||||
@ -47,7 +47,7 @@ class NoopOperationManager extends OperationManager("noop") {
|
||||
session: Session,
|
||||
catalog: String,
|
||||
schema: String): Operation = {
|
||||
val operation = new NoopOperation(session)
|
||||
val operation = new NoopOperation(OperationType.GET_SCHEMAS, session)
|
||||
addOperation(operation)
|
||||
}
|
||||
|
||||
@ -57,12 +57,12 @@ class NoopOperationManager extends OperationManager("noop") {
|
||||
schemaName: String,
|
||||
tableName: String,
|
||||
tableTypes: java.util.List[String]): Operation = {
|
||||
val operation = new NoopOperation(session)
|
||||
val operation = new NoopOperation(OperationType.GET_TABLES, session)
|
||||
addOperation(operation)
|
||||
}
|
||||
|
||||
override def newGetTableTypesOperation(session: Session): Operation = {
|
||||
val operation = new NoopOperation(session)
|
||||
val operation = new NoopOperation(OperationType.GET_TABLE_TYPES, session)
|
||||
addOperation(operation)
|
||||
}
|
||||
|
||||
@ -72,7 +72,7 @@ class NoopOperationManager extends OperationManager("noop") {
|
||||
schemaName: String,
|
||||
tableName: String,
|
||||
columnName: String): Operation = {
|
||||
val operation = new NoopOperation(session)
|
||||
val operation = new NoopOperation(OperationType.GET_COLUMNS, session)
|
||||
addOperation(operation)
|
||||
}
|
||||
|
||||
@ -81,7 +81,7 @@ class NoopOperationManager extends OperationManager("noop") {
|
||||
catalogName: String,
|
||||
schemaName: String,
|
||||
functionName: String): Operation = {
|
||||
val operation = new NoopOperation(session)
|
||||
val operation = new NoopOperation(OperationType.GET_FUNCTIONS, session)
|
||||
addOperation(operation)
|
||||
}
|
||||
|
||||
|
||||
@ -19,7 +19,7 @@ package org.apache.kyuubi.service
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TSessionHandle, TStatusCode}
|
||||
import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TGetCatalogsReq, TOpenSessionReq, TOperationType, TSessionHandle, TStatus, TStatusCode}
|
||||
import org.apache.thrift.protocol.TBinaryProtocol
|
||||
import org.apache.thrift.transport.TSocket
|
||||
|
||||
@ -80,7 +80,6 @@ class FrontendServiceSuite extends KyuubiFunSuite {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
test("open session") {
|
||||
withThriftClient { client =>
|
||||
val req = new TOpenSessionReq()
|
||||
@ -100,4 +99,14 @@ class FrontendServiceSuite extends KyuubiFunSuite {
|
||||
assert(cause.getMessage === "Asked to fail")
|
||||
}
|
||||
}
|
||||
|
||||
test("get catalogs") {
|
||||
withSessionHandle { (client, handle) =>
|
||||
val req = new TGetCatalogsReq(handle)
|
||||
val resp = client.GetCatalogs(req)
|
||||
val opHandle = resp.getOperationHandle
|
||||
assert(opHandle.getOperationType === TOperationType.GET_CATALOGS)
|
||||
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -28,7 +28,5 @@ class NoopSessionImpl(
|
||||
sessionManager: SessionManager)
|
||||
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
|
||||
|
||||
override def handle: SessionHandle = SessionHandle(protocol)
|
||||
|
||||
override def open(): Unit = {}
|
||||
}
|
||||
|
||||
@ -34,6 +34,8 @@ class NoopSessionManager extends SessionManager("noop") {
|
||||
if (conf.get("kyuubi.test.should.fail").exists(_.toBoolean)) {
|
||||
throw KyuubiSQLException("Asked to fail")
|
||||
}
|
||||
new NoopSessionImpl(protocol, user, password, ipAddress, conf, this).handle
|
||||
val session = new NoopSessionImpl(protocol, user, password, ipAddress, conf, this)
|
||||
setSession(session.handle, session)
|
||||
session.handle
|
||||
}
|
||||
}
|
||||
|
||||
@ -49,41 +49,6 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
test("new frontend service") {
|
||||
val feService = new FrontendService(server.beService)
|
||||
feService.getConf should be(null)
|
||||
feService.getStartTime should be(0)
|
||||
feService.getServiceState should be(State.NOT_INITED)
|
||||
feService.getName should be(classOf[FrontendService].getSimpleName)
|
||||
feService.getServerIPAddress should be(null)
|
||||
feService.getPortNumber should be(0)
|
||||
}
|
||||
|
||||
test("start fe service") {
|
||||
val feService = new FrontendService(server.beService)
|
||||
intercept[IllegalStateException](feService.start())
|
||||
feService.init(conf)
|
||||
feService.start()
|
||||
feService.getConf should be(conf)
|
||||
feService.getStartTime should not be 0
|
||||
feService.getServiceState should be(State.STARTED)
|
||||
}
|
||||
|
||||
test("get catalogs") {
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
val req = new TGetCatalogsReq(handle)
|
||||
val resp = fe.GetCatalogs(req)
|
||||
val req2 = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
|
||||
val resp2 = fe.FetchResults(req2)
|
||||
val rows = resp2.getResults.getRows
|
||||
rows.size() should be(0)
|
||||
val closeReq = new TCloseSessionReq(handle)
|
||||
fe.CloseSession(closeReq)
|
||||
val afterCloseResp = fe.GetCatalogs(req)
|
||||
afterCloseResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
|
||||
}
|
||||
}
|
||||
|
||||
test("get table types") {
|
||||
withFEServiceAndHandle { case (fe, handle) =>
|
||||
val req = new TGetTableTypesReq(handle)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user