From 07204dbef5c90df70adfa9d96634e70dc5e77e22 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 15 Sep 2020 14:36:02 +0800 Subject: [PATCH] add test for close and cancel operation --- .../spark/operation/SparkOperationSuite.scala | 58 ++++++++++++++++++- .../apache/kyuubi/config/ConfigBuilder.scala | 23 ++++++-- .../org/apache/kyuubi/config/KyuubiConf.scala | 11 +++- 3 files changed, 85 insertions(+), 7 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala index c25473e25..85f28f064 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala @@ -22,7 +22,7 @@ import java.sql.{Date, SQLException, Timestamp} import scala.collection.JavaConverters._ import org.apache.hive.service.cli.HiveSQLException -import org.apache.hive.service.rpc.thrift.{TCloseSessionReq, TExecuteStatementReq, TFetchResultsReq, TOpenSessionReq, TStatus, TStatusCode} +import org.apache.hive.service.rpc.thrift._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.types._ @@ -746,7 +746,7 @@ class SparkOperationSuite extends WithSparkSQLEngine { val tOpenSessionResp = client.OpenSession(req) val tExecuteStatementReq = new TExecuteStatementReq() - tExecuteStatementReq.setSessionHandle( tOpenSessionResp.getSessionHandle) + tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle) tExecuteStatementReq.setStatement("set") val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq) @@ -800,4 +800,58 @@ class SparkOperationSuite extends WithSparkSQLEngine { assert(status.getErrorMessage.contains("Database 'default2' does not exist")) } } + + test("not allow to operate closed session or operation") { + withThriftClient { client => + val req = new TOpenSessionReq() + req.setUsername("kentyao") + req.setPassword("anonymous") + val tOpenSessionResp = client.OpenSession(req) + + val tExecuteStatementReq = new TExecuteStatementReq() + tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle) + tExecuteStatementReq.setStatement("set") + val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq) + + val tCloseOperationReq = new TCloseOperationReq(tExecuteStatementResp.getOperationHandle) + val tCloseOperationResp = client.CloseOperation(tCloseOperationReq) + assert(tCloseOperationResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = new TFetchResultsReq() + tFetchResultsReq.setOperationHandle(tExecuteStatementResp.getOperationHandle) + tFetchResultsReq.setFetchType(0) + tFetchResultsReq.setMaxRows(1000) + val tFetchResultsResp = client.FetchResults(tFetchResultsReq) + assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + assert(tFetchResultsResp.getStatus.getErrorMessage startsWith "Invalid OperationHandle" + + " [type=EXECUTE_STATEMENT, identifier:") + + val tCloseSessionReq = new TCloseSessionReq() + tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle) + val tCloseSessionResp = client.CloseSession(tCloseSessionReq) + assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + val tExecuteStatementResp1 = client.ExecuteStatement(tExecuteStatementReq) + + val status = tExecuteStatementResp1.getStatus + assert(status.getStatusCode === TStatusCode.ERROR_STATUS) + assert(status.getErrorMessage startsWith s"Invalid SessionHandle [") + } + } + + test("cancel operation") { + withThriftClient { client => + val req = new TOpenSessionReq() + req.setUsername("kentyao") + req.setPassword("anonymous") + val tOpenSessionResp = client.OpenSession(req) + + val tExecuteStatementReq = new TExecuteStatementReq() + tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle) + tExecuteStatementReq.setStatement("set") + val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq) + val tCancelOperationReq = new TCancelOperationReq(tExecuteStatementResp.getOperationHandle) + val tCancelOperationResp = client.CancelOperation(tCancelOperationReq) + tCancelOperationResp + } + } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/ConfigBuilder.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/ConfigBuilder.scala index 1e1bac94c..f7bfa29b5 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/ConfigBuilder.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/ConfigBuilder.scala @@ -24,6 +24,7 @@ private[kyuubi] case class ConfigBuilder(key: String) { private[config] var _doc = "" private[config] var _version = "" + private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None def doc(s: String): ConfigBuilder = { _doc = s @@ -35,6 +36,11 @@ private[kyuubi] case class ConfigBuilder(key: String) { this } + def onCreate(callback: ConfigEntry[_] => Unit): ConfigBuilder = { + _onCreate = Option(callback) + this + } + private def toNumber[T](s: String, converter: String => T, configType: String): T = { try { converter(s.trim) @@ -126,17 +132,26 @@ private[kyuubi] case class TypedConfigBuilder[T]( TypedConfigBuilder(parent, strToSeq(_, fromStr), seqToStr(_, toStr)) } - def createOptional: OptionalConfigEntry[T] = OptionalConfigEntry( - parent.key, fromStr, toStr, parent._doc, parent._version) + def createOptional: OptionalConfigEntry[T] = { + val entry = OptionalConfigEntry(parent.key, fromStr, toStr, parent._doc, parent._version) + parent._onCreate.foreach(_(entry)) + entry + } def createWithDefault(default: T): ConfigEntry[T] = default match { case d: String => createWithDefaultString(d) case _ => val d = fromStr(toStr(default)) - ConfigEntryWithDefault(parent.key, d, fromStr, toStr, parent._doc, parent._version) + val entry = + ConfigEntryWithDefault(parent.key, d, fromStr, toStr, parent._doc, parent._version) + parent._onCreate.foreach(_(entry)) + entry } def createWithDefaultString(default: String): ConfigEntryWithDefaultString[T] = { - ConfigEntryWithDefaultString(parent.key, default, fromStr, toStr, parent._doc, parent._version) + val entry = ConfigEntryWithDefaultString( + parent.key, default, fromStr, toStr, parent._doc, parent._version) + parent._onCreate.foreach(_(entry)) + entry } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index e18b06f08..c17fd9e1a 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -122,7 +122,16 @@ object KyuubiConf { final val KYUUBI_CONF_FILE_NAME = "kyuubi-defaults.conf" final val KYUUBI_HOME = "KYUUBI_HOME" - def buildConf(key: String): ConfigBuilder = ConfigBuilder(KYUUBI_PREFIX + key) + val kyuubiConfEntries: java.util.Map[String, ConfigEntry[_]] = + java.util.Collections.synchronizedMap(new java.util.HashMap[String, ConfigEntry[_]]()) + + private def register(entry: ConfigEntry[_]): Unit = kyuubiConfEntries.synchronized { + require(!kyuubiConfEntries.containsKey(entry.key), + s"Duplicate SQLConfigEntry. ${entry.key} has been registered") + kyuubiConfEntries.put(entry.key, entry) + } + + def buildConf(key: String): ConfigBuilder = ConfigBuilder(KYUUBI_PREFIX + key).onCreate(register) val EMBEDDED_ZK_PORT: ConfigEntry[Int] = buildConf("embedded.zookeeper.port") .doc("The port of the embedded zookeeper server")