add test for close and cancel operation

This commit is contained in:
Kent Yao 2020-09-15 14:36:02 +08:00
parent 2309cfbac4
commit 07204dbef5
3 changed files with 85 additions and 7 deletions

View File

@ -22,7 +22,7 @@ import java.sql.{Date, SQLException, Timestamp}
import scala.collection.JavaConverters._
import org.apache.hive.service.cli.HiveSQLException
import org.apache.hive.service.rpc.thrift.{TCloseSessionReq, TExecuteStatementReq, TFetchResultsReq, TOpenSessionReq, TStatus, TStatusCode}
import org.apache.hive.service.rpc.thrift._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.types._
@ -746,7 +746,7 @@ class SparkOperationSuite extends WithSparkSQLEngine {
val tOpenSessionResp = client.OpenSession(req)
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle( tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setStatement("set")
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
@ -800,4 +800,58 @@ class SparkOperationSuite extends WithSparkSQLEngine {
assert(status.getErrorMessage.contains("Database 'default2' does not exist"))
}
}
test("not allow to operate closed session or operation") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val tOpenSessionResp = client.OpenSession(req)
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setStatement("set")
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
val tCloseOperationReq = new TCloseOperationReq(tExecuteStatementResp.getOperationHandle)
val tCloseOperationResp = client.CloseOperation(tCloseOperationReq)
assert(tCloseOperationResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq = new TFetchResultsReq()
tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
tFetchResultsReq.setFetchType(0)
tFetchResultsReq.setMaxRows(1000)
val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(tFetchResultsResp.getStatus.getErrorMessage startsWith "Invalid OperationHandle" +
" [type=EXECUTE_STATEMENT, identifier:")
val tCloseSessionReq = new TCloseSessionReq()
tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
val tCloseSessionResp = client.CloseSession(tCloseSessionReq)
assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
val tExecuteStatementResp1 = client.ExecuteStatement(tExecuteStatementReq)
val status = tExecuteStatementResp1.getStatus
assert(status.getStatusCode === TStatusCode.ERROR_STATUS)
assert(status.getErrorMessage startsWith s"Invalid SessionHandle [")
}
}
test("cancel operation") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val tOpenSessionResp = client.OpenSession(req)
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setStatement("set")
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
val tCancelOperationReq = new TCancelOperationReq(tExecuteStatementResp.getOperationHandle)
val tCancelOperationResp = client.CancelOperation(tCancelOperationReq)
tCancelOperationResp
}
}
}

View File

@ -24,6 +24,7 @@ private[kyuubi] case class ConfigBuilder(key: String) {
private[config] var _doc = ""
private[config] var _version = ""
private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
def doc(s: String): ConfigBuilder = {
_doc = s
@ -35,6 +36,11 @@ private[kyuubi] case class ConfigBuilder(key: String) {
this
}
def onCreate(callback: ConfigEntry[_] => Unit): ConfigBuilder = {
_onCreate = Option(callback)
this
}
private def toNumber[T](s: String, converter: String => T, configType: String): T = {
try {
converter(s.trim)
@ -126,17 +132,26 @@ private[kyuubi] case class TypedConfigBuilder[T](
TypedConfigBuilder(parent, strToSeq(_, fromStr), seqToStr(_, toStr))
}
def createOptional: OptionalConfigEntry[T] = OptionalConfigEntry(
parent.key, fromStr, toStr, parent._doc, parent._version)
def createOptional: OptionalConfigEntry[T] = {
val entry = OptionalConfigEntry(parent.key, fromStr, toStr, parent._doc, parent._version)
parent._onCreate.foreach(_(entry))
entry
}
def createWithDefault(default: T): ConfigEntry[T] = default match {
case d: String => createWithDefaultString(d)
case _ =>
val d = fromStr(toStr(default))
ConfigEntryWithDefault(parent.key, d, fromStr, toStr, parent._doc, parent._version)
val entry =
ConfigEntryWithDefault(parent.key, d, fromStr, toStr, parent._doc, parent._version)
parent._onCreate.foreach(_(entry))
entry
}
def createWithDefaultString(default: String): ConfigEntryWithDefaultString[T] = {
ConfigEntryWithDefaultString(parent.key, default, fromStr, toStr, parent._doc, parent._version)
val entry = ConfigEntryWithDefaultString(
parent.key, default, fromStr, toStr, parent._doc, parent._version)
parent._onCreate.foreach(_(entry))
entry
}
}

View File

@ -122,7 +122,16 @@ object KyuubiConf {
final val KYUUBI_CONF_FILE_NAME = "kyuubi-defaults.conf"
final val KYUUBI_HOME = "KYUUBI_HOME"
def buildConf(key: String): ConfigBuilder = ConfigBuilder(KYUUBI_PREFIX + key)
val kyuubiConfEntries: java.util.Map[String, ConfigEntry[_]] =
java.util.Collections.synchronizedMap(new java.util.HashMap[String, ConfigEntry[_]]())
private def register(entry: ConfigEntry[_]): Unit = kyuubiConfEntries.synchronized {
require(!kyuubiConfEntries.containsKey(entry.key),
s"Duplicate SQLConfigEntry. ${entry.key} has been registered")
kyuubiConfEntries.put(entry.key, entry)
}
def buildConf(key: String): ConfigBuilder = ConfigBuilder(KYUUBI_PREFIX + key).onCreate(register)
val EMBEDDED_ZK_PORT: ConfigEntry[Int] = buildConf("embedded.zookeeper.port")
.doc("The port of the embedded zookeeper server")