diff --git a/conf/kyuubi-defaults.conf.template b/conf/kyuubi-defaults.conf.template
index 34558100e..077177f90 100644
--- a/conf/kyuubi-defaults.conf.template
+++ b/conf/kyuubi-defaults.conf.template
@@ -24,6 +24,7 @@
#
# spark.master local
# spark.ui.enabled false
+# spark.driver.extraJavaOptions -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
## Hadoop Configurations, they will override those in $HADOOP_CONF_DIR
#
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index e459caaf5..e414c1cbe 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -90,6 +90,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
#
# spark.master local
# spark.ui.enabled false
+# spark.driver.extraJavaOptions -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
## Hadoop Configurations, they will override those in $HADOOP_CONF_DIR
#
@@ -106,6 +107,19 @@ kyuubi\.authentication
\.ldap\.domain|
<undefined>
|SPACE character separated LDAP connection URL(s).
|1.0.0
kyuubi\.authentication
\.sasl\.qop|auth
|Sasl QOP enable higher levels of protection for Kyuubi communication with clients.
- auth - authentication only (default)
- auth-int - authentication plus integrity protection
- auth-conf - authentication plus integrity and confidentiality protection. This is applicable only if Kyuubi is configured to use Kerberos authentication.
|1.0.0
+### Backend
+
+Key | Default | Meaning | Since
+--- | --- | --- | ---
+kyuubi\.backend\.engine
\.exec\.pool\.keepalive
\.time|PT1M
|Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in SQL engine applications
|1.0.0
+kyuubi\.backend\.engine
\.exec\.pool\.shutdown
\.timeout|PT10S
|Timeout(ms) for the operation execution thread pool to terminate in SQL engine applications
|1.0.0
+kyuubi\.backend\.engine
\.exec\.pool\.size|100
|Number of threads in the operation execution thread pool of SQL engine applications
|1.0.0
+kyuubi\.backend\.engine
\.exec\.pool\.wait\.queue
\.size|100
|Size of the wait queue for the operation execution thread pool in SQL engine applications
|1.0.0
+kyuubi\.backend\.server
\.exec\.pool\.keepalive
\.time|PT1M
|Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in Kyuubi server
|1.0.0
+kyuubi\.backend\.server
\.exec\.pool\.shutdown
\.timeout|PT10S
|Timeout(ms) for the operation execution thread pool to terminate in Kyuubi server
|1.0.0
+kyuubi\.backend\.server
\.exec\.pool\.size|100
|Number of threads in the operation execution thread pool of Kyuubi server
|1.0.0
+kyuubi\.backend\.server
\.exec\.pool\.wait\.queue
\.size|100
|Size of the wait queue for the operation execution thread pool of Kyuubi server
|1.0.0
+
### Delegation
Key | Default | Meaning | Since
@@ -156,21 +170,17 @@ kyuubi\.kinit
\.principal|PT3H
|Operation will be closed when it's not accessed for this duration of time
|1.0.0
+kyuubi\.operation
\.status\.polling
\.timeout|PT5S
|Timeout(ms) for long polling asynchronous running sql query's status
|1.0.0
### Session
Key | Default | Meaning | Since
--- | --- | --- | ---
kyuubi\.session\.check
\.interval|PT5M
|The check interval for session timeout.
|1.0.0
-kyuubi\.session\.engine
\.backend\.pool
\.keepalive\.time|PT1M
|Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating
|1.0.0
-kyuubi\.session\.engine
\.backend\.pool\.shutdown
\.timeout|PT10S
|Timeout(ms) for the operation execution thread pool to terminate
|1.0.0
-kyuubi\.session\.engine
\.backend\.pool\.size|100
|Number of threads in the operation execution thread pool for SQL engine applications
|1.0.0
-kyuubi\.session\.engine
\.backend\.pool\.wait
\.queue\.size|100
|Size of the wait queue for the operation execution thread pool in SQL engine applications
|1.0.0
kyuubi\.session\.engine
\.check\.interval|PT10M
|The check interval for engine timeout
|1.0.0
kyuubi\.session\.engine
\.idle\.timeout|PT30M
|engine timeout, the engine will self-terminate when it's not accessed for this duration
|1.0.0
kyuubi\.session\.engine
\.initialize\.timeout|PT1M
|Timeout for starting the background engine, e.g. SparkSQLEngine.
|1.0.0
kyuubi\.session\.engine
\.login\.timeout|PT15S
|The timeout(ms) of creating the connection to remote sql query engine
|1.0.0
-kyuubi\.session\.engine
\.long\.polling\.timeout|PT5S
|Timeout(ms) for long polling asynchronous running sql query's status
|1.0.0
kyuubi\.session\.engine
\.spark\.main\.resource|<undefined>
|The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default
|1.0.0
kyuubi\.session
\.timeout|PT6H
|session timeout, it will be closed when it's not accessed for this duration
|1.0.0
diff --git a/docs/tools/debugging.md b/docs/tools/debugging.md
index 5fef551bb..5e34b0b69 100644
--- a/docs/tools/debugging.md
+++ b/docs/tools/debugging.md
@@ -9,6 +9,8 @@
You can use the [Java Debug Wire Protocol](https://docs.oracle.com/javase/8/docs/technotes/guides/jpda/conninv.html#Plugin) to debug Kyuubi
w/ your favorite IDE tool, e.g. Intellij IDEA.
+## Debugging Server
+
We can configure the JDWP agent in `KYUUBI_JAVA_OPTS` for debugging.
@@ -24,3 +26,16 @@ In the IDE, you set the corresponding parameters(host&port) in debug configurati

+
+## Debugging Apps
+
+- Spark Driver
+
+```bash
+spark.driver.extraJavaOptions -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
+```
+
+- Spark Executor
+```bash
+spark.executor.extraJavaOptions -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
+```
\ No newline at end of file
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLBackendService.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLBackendService.scala
index 65e75a8c6..4c9434d24 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLBackendService.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLBackendService.scala
@@ -17,16 +17,10 @@
package org.apache.kyuubi.engine.spark
-import java.util.concurrent.{ExecutionException, TimeoutException, TimeUnit}
-
-import scala.concurrent.CancellationException
-
import org.apache.spark.sql.SparkSession
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.spark.operation.ExecuteStatement
import org.apache.kyuubi.engine.spark.session.SparkSQLSessionManager
-import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationStatus}
+import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.service.AbstractBackendService
import org.apache.kyuubi.service.BackendService
import org.apache.kyuubi.session.Session
@@ -42,29 +36,7 @@ import org.apache.kyuubi.session.SessionManager
class SparkSQLBackendService(name: String, spark: SparkSession)
extends AbstractBackendService(name) {
- private lazy val timeout = conf.get(KyuubiConf.ENGINE_LONG_POLLING_TIMEOUT)
-
def this(spark: SparkSession) = this(classOf[SparkSQLBackendService].getSimpleName, spark)
override val sessionManager: SessionManager = new SparkSQLSessionManager(spark)
-
- override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = {
- val operation = sessionManager.operationManager.getOperation(operationHandle)
- operation match {
- case es: ExecuteStatement if es.shouldRunAsync =>
- try {
- es.backgroundHandle.get(timeout, TimeUnit.MILLISECONDS)
- } catch {
- case e: TimeoutException =>
- debug(s"$operationHandle: Long polling timed out, ${e.getMessage}")
- case e: CancellationException =>
- debug(s"$operationHandle: The background operation was cancelled, ${e.getMessage}")
- case e: ExecutionException =>
- debug(s"$operationHandle: The background operation was aborted, ${e.getMessage}")
- case _: InterruptedException =>
- }
- case _ =>
- }
- super.getOperationStatus(operationHandle)
- }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 733a6bbbf..32f9ecf34 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.spark.operation
-import java.util.concurrent.{Future, RejectedExecutionException}
+import java.util.concurrent.RejectedExecutionException
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.StructType
@@ -25,7 +25,6 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.engine.spark.operation.log.OperationLog
-import org.apache.kyuubi.engine.spark.session.SparkSQLSessionManager
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.session.Session
@@ -37,8 +36,6 @@ class ExecuteStatement(
extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) with Logging {
private var result: DataFrame = _
- @volatile private var _backgroundHandle: Future[_] = _
- def backgroundHandle: Future[_] = _backgroundHandle
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
@@ -49,6 +46,7 @@ class ExecuteStatement(
}
override protected def beforeRun(): Unit = {
+ OperationLog.setCurrentOperationLog(operationLog)
setState(OperationState.PENDING)
setHasResultSet(true)
}
@@ -59,7 +57,6 @@ class ExecuteStatement(
private def executeStatement(): Unit = {
try {
- OperationLog.setCurrentOperationLog(operationLog)
setState(OperationState.RUNNING)
info(KyuubiSparkUtil.diagnostics(spark))
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
@@ -78,13 +75,16 @@ class ExecuteStatement(
override protected def runInternal(): Unit = {
if (shouldRunAsync) {
val asyncOperation = new Runnable {
- override def run(): Unit = executeStatement()
+ override def run(): Unit = {
+ OperationLog.setCurrentOperationLog(operationLog)
+ executeStatement()
+ }
}
try {
- val sparkSQLSessionManager = session.sessionManager.asInstanceOf[SparkSQLSessionManager]
- val future = sparkSQLSessionManager.submitBackgroundOperation(asyncOperation)
- _backgroundHandle = future
+ val sparkSQLSessionManager = session.sessionManager
+ val backgroundHandle = sparkSQLSessionManager.submitBackgroundOperation(asyncOperation)
+ setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index 0cbb4000d..d5bbe222e 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -132,11 +132,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = {
-
val log = getOperation(opHandle).asInstanceOf[SparkOperation].getOperationLog
- if (log == null) {
- throw KyuubiSQLException(s"Couldn't find log associated with $opHandle")
- }
log.read(maxRows)
}
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLog.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLog.scala
index a24adbca6..50f899a67 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLog.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLog.scala
@@ -123,7 +123,7 @@ class OperationLog(path: Path) extends Logging {
throw new KyuubiSQLException(s"Operation[$opHandle] log file $absPath is not found", e)
}
val tColumn = TColumn.stringVal(new TStringColumn(logs, ByteBuffer.allocate(0)))
- val tRow = new TRowSet(0, new java.util.ArrayList[TRow](0))
+ val tRow = new TRowSet(0, new java.util.ArrayList[TRow](logs.size()))
tRow.addToColumns(tColumn)
tRow
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 78fe7483a..ccc55e462 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -17,17 +17,12 @@
package org.apache.kyuubi.engine.spark.session
-import java.util.concurrent.{Future, ThreadPoolExecutor, TimeUnit}
-
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.session._
-import org.apache.kyuubi.util.ThreadUtils
/**
* A [[SessionManager]] constructed with [[SparkSession]] which give it the ability to talk with
@@ -44,34 +39,9 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
val operationManager = new SparkSQLOperationManager()
- private var execPool: ThreadPoolExecutor = _
-
@volatile private var _latestLogoutTime: Long = Long.MaxValue
def latestLogoutTime: Long = _latestLogoutTime
- override def initialize(conf: KyuubiConf): Unit = {
- val poolSize = conf.get(ENGINE_EXEC_POOL_SIZE)
- val waitQueueSize = conf.get(ENGINE_EXEC_WAIT_QUEUE_SIZE)
- val keepAliveMs = conf.get(ENGINE_EXEC_KEEPALIVE_TIME)
- execPool = ThreadUtils.newDaemonQueuedThreadPool(
- poolSize, waitQueueSize, keepAliveMs, s"$name-exec-pool")
- super.initialize(conf)
- }
-
- override def stop(): Unit = {
- if (execPool != null) {
- execPool.shutdown()
- val timeout = conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
- try {
- execPool.awaitTermination(timeout, TimeUnit.SECONDS)
- } catch {
- case e: InterruptedException =>
- warn(s"Exceeded timeout($timeout ms) to wait the exec-pool shutdown gracefully", e)
- }
- }
- super.stop()
- }
-
override def openSession(
protocol: TProtocolVersion,
user: String,
@@ -108,8 +78,6 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
operationManager.removeSparkSession(sessionHandle)
}
- def submitBackgroundOperation(r: Runnable): Future[_] = execPool.submit(r)
-
private def setModifiableConfig(spark: SparkSession, key: String, value: String): Unit = {
if (spark.conf.isModifiable(key)) {
spark.conf.set(key, value)
@@ -117,4 +85,6 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
warn(s"Spark config $key is static and will be ignored")
}
}
+
+ override protected def isServer: Boolean = false
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index a966c25b9..452233da5 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -150,7 +150,7 @@ object KyuubiConf {
}
def buildConf(key: String): ConfigBuilder = {
- new ConfigBuilder("kyuubi." + key).onCreate(register)
+ ConfigBuilder("kyuubi." + key).onCreate(register)
}
val EMBEDDED_ZK_PORT: ConfigEntry[Int] = buildConf("zookeeper.embedded.port")
@@ -362,14 +362,14 @@ object KyuubiConf {
.doc("session timeout, it will be closed when it's not accessed for this duration")
.version("1.0.0")
.timeConf
- .checkValue(_ > Duration.ofSeconds(3).toMillis, "Minimum 3 seconds")
+ .checkValue(_ >= Duration.ofSeconds(3).toMillis, "Minimum 3 seconds")
.createWithDefault(Duration.ofHours(6).toMillis)
val ENGINE_CHECK_INTERVAL: ConfigEntry[Long] = buildConf("session.engine.check.interval")
.doc("The check interval for engine timeout")
.version("1.0.0")
.timeConf
- .checkValue(_ > Duration.ofSeconds(3).toMillis, "Minimum 3 seconds")
+ .checkValue(_ >= Duration.ofSeconds(3).toMillis, "Minimum 3 seconds")
.createWithDefault(Duration.ofMinutes(10).toMillis)
val ENGINE_IDLE_TIMEOUT: ConfigEntry[Long] = buildConf("session.engine.idle.timeout")
@@ -378,38 +378,68 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofMinutes(30L).toMillis)
+ val SERVER_EXEC_POOL_SIZE: ConfigEntry[Int] =
+ buildConf("backend.server.exec.pool.size")
+ .doc("Number of threads in the operation execution thread pool of Kyuubi server")
+ .version("1.0.0")
+ .intConf
+ .createWithDefault(100)
+
val ENGINE_EXEC_POOL_SIZE: ConfigEntry[Int] =
- buildConf("session.engine.backend.pool.size")
- .doc("Number of threads in the operation execution thread pool for SQL engine applications")
+ buildConf("backend.engine.exec.pool.size")
+ .doc("Number of threads in the operation execution thread pool of SQL engine applications")
+ .version("1.0.0")
+ .intConf
+ .createWithDefault(100)
+
+ val SERVER_EXEC_WAIT_QUEUE_SIZE: ConfigEntry[Int] =
+ buildConf("backend.server.exec.pool.wait.queue.size")
+ .doc("Size of the wait queue for the operation execution thread pool of Kyuubi server")
.version("1.0.0")
.intConf
.createWithDefault(100)
val ENGINE_EXEC_WAIT_QUEUE_SIZE: ConfigEntry[Int] =
- buildConf("session.engine.backend.pool.wait.queue.size")
+ buildConf("backend.engine.exec.pool.wait.queue.size")
.doc("Size of the wait queue for the operation execution thread pool in SQL engine" +
" applications")
.version("1.0.0")
.intConf
.createWithDefault(100)
- val ENGINE_EXEC_KEEPALIVE_TIME: ConfigEntry[Long] =
- buildConf("session.engine.backend.pool.keepalive.time")
+ val SERVER_EXEC_KEEPALIVE_TIME: ConfigEntry[Long] =
+ buildConf("backend.server.exec.pool.keepalive.time")
.doc("Time(ms) that an idle async thread of the operation execution thread pool will wait" +
- " for a new task to arrive before terminating")
+ " for a new task to arrive before terminating in Kyuubi server")
.version("1.0.0")
.timeConf
.createWithDefault(Duration.ofSeconds(60).toMillis)
- val ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
- buildConf("session.engine.backend.pool.shutdown.timeout")
- .doc("Timeout(ms) for the operation execution thread pool to terminate")
+ val ENGINE_EXEC_KEEPALIVE_TIME: ConfigEntry[Long] =
+ buildConf("backend.engine.exec.pool.keepalive.time")
+ .doc("Time(ms) that an idle async thread of the operation execution thread pool will wait" +
+ " for a new task to arrive before terminating in SQL engine applications")
+ .version("1.0.0")
+ .timeConf
+ .createWithDefault(Duration.ofSeconds(60).toMillis)
+
+ val SERVER_EXEC_POOL_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
+ buildConf("backend.server.exec.pool.shutdown.timeout")
+ .doc("Timeout(ms) for the operation execution thread pool to terminate in Kyuubi server")
.version("1.0.0")
.timeConf
.createWithDefault(Duration.ofSeconds(10).toMillis)
- val ENGINE_LONG_POLLING_TIMEOUT: ConfigEntry[Long] =
- buildConf("session.engine.long.polling.timeout")
+ val ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
+ buildConf("backend.engine.exec.pool.shutdown.timeout")
+ .doc("Timeout(ms) for the operation execution thread pool to terminate in SQL engine" +
+ " applications")
+ .version("1.0.0")
+ .timeConf
+ .createWithDefault(Duration.ofSeconds(10).toMillis)
+
+ val OPERATION_STATUS_POLLING_TIMEOUT: ConfigEntry[Long] =
+ buildConf("operation.status.polling.timeout")
.doc("Timeout(ms) for long polling asynchronous running sql query's status")
.version("1.0.0")
.timeConf
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 9e360f07d..ce416fa10 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.operation
import java.time.Duration
+import java.util.concurrent.Future
import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet, TTableSchema}
@@ -46,6 +47,14 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
@volatile protected var operationException: KyuubiSQLException = _
@volatile protected var hasResultSet: Boolean = false
+ @volatile private var _backgroundHandle: Future[_] = _
+
+ protected def setBackgroundHandle(backgroundHandle: Future[_]): Unit = {
+ _backgroundHandle = backgroundHandle
+ }
+
+ def getBackgroundHandle: Future[_] = _backgroundHandle
+
protected def statement: String = opType.toString
protected def setHasResultSet(hasResultSet: Boolean): Unit = {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
index b51027dd2..a728e24ea 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.operation
+import java.util.concurrent.Future
+
import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet, TTableSchema}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@@ -36,6 +38,7 @@ trait Operation {
def getHandle: OperationHandle
def getStatus: OperationStatus
+ def getBackgroundHandle: Future[_]
def shouldRunAsync: Boolean
def isTimedOut: Boolean
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
index 676fe97f3..b4b33a8fe 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
@@ -17,6 +17,10 @@
package org.apache.kyuubi.service
+import java.util.concurrent.{ExecutionException, TimeoutException, TimeUnit}
+
+import scala.concurrent.CancellationException
+
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion, TRowSet, TTableSchema}
import org.apache.kyuubi.config.KyuubiConf
@@ -30,6 +34,8 @@ import org.apache.kyuubi.session.SessionHandle
abstract class AbstractBackendService(name: String)
extends CompositeService(name) with BackendService {
+ private lazy val timeout = conf.get(KyuubiConf.OPERATION_STATUS_POLLING_TIMEOUT)
+
override def openSession(
protocol: TProtocolVersion,
user: String,
@@ -117,10 +123,22 @@ abstract class AbstractBackendService(name: String)
}
override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = {
- sessionManager
- .operationManager
- .getOperation(operationHandle)
- .getStatus
+ val operation = sessionManager.operationManager.getOperation(operationHandle)
+ if (operation.shouldRunAsync) {
+ try {
+ operation.getBackgroundHandle.get(timeout, TimeUnit.MILLISECONDS)
+ } catch {
+ case e: TimeoutException =>
+ debug(s"$operationHandle: Long polling timed out, ${e.getMessage}")
+ case e: CancellationException =>
+ debug(s"$operationHandle: The background operation was cancelled, ${e.getMessage}")
+ case e: ExecutionException =>
+ debug(s"$operationHandle: The background operation was aborted, ${e.getMessage}")
+ case _: InterruptedException =>
+ }
+ }
+ operation.getStatus
+
}
override def cancelOperation(operationHandle: OperationHandle): Unit = {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index 7f5d50ead..8f6dea6e6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.session
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, Future, ThreadPoolExecutor, TimeUnit}
import scala.collection.JavaConverters._
@@ -25,6 +25,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.operation.OperationManager
import org.apache.kyuubi.service.CompositeService
import org.apache.kyuubi.util.ThreadUtils
@@ -44,6 +45,12 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
private val timeoutChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-timeout-checker")
+ protected def isServer: Boolean
+
+ private var execPool: ThreadPoolExecutor = _
+
+ def submitBackgroundOperation(r: Runnable): Future[_] = execPool.submit(r)
+
def operationManager: OperationManager
def openSession(
@@ -77,6 +84,26 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
override def initialize(conf: KyuubiConf): Unit = {
addService(operationManager)
+
+ val poolSize: Int = if (isServer) {
+ conf.get(SERVER_EXEC_POOL_SIZE)
+ } else {
+ conf.get(ENGINE_EXEC_POOL_SIZE)
+ }
+
+ val waitQueueSize: Int = if (isServer) {
+ conf.get(SERVER_EXEC_WAIT_QUEUE_SIZE)
+ } else {
+ conf.get(ENGINE_EXEC_WAIT_QUEUE_SIZE)
+ }
+ val keepAliveMs: Long = if (isServer) {
+ conf.get(SERVER_EXEC_KEEPALIVE_TIME)
+ } else {
+ conf.get(ENGINE_EXEC_KEEPALIVE_TIME)
+ }
+
+ execPool = ThreadUtils.newDaemonQueuedThreadPool(
+ poolSize, waitQueueSize, keepAliveMs, s"$name-exec-pool")
super.initialize(conf)
}
@@ -88,13 +115,29 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
override def stop(): Unit = {
super.stop()
shutdown = true
+ val shutdownTimeout: Long = if (isServer) {
+ conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
+ } else {
+ conf.get(SERVER_EXEC_POOL_SHUTDOWN_TIMEOUT)
+ }
timeoutChecker.shutdown()
try {
- timeoutChecker.awaitTermination(10, TimeUnit.SECONDS)
+ timeoutChecker.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)
} catch {
case i: InterruptedException =>
warn(s"Exceeded to shutdown session timeout checker ", i)
}
+
+ if (execPool != null) {
+ execPool.shutdown()
+ try {
+ execPool.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)
+ } catch {
+ case e: InterruptedException =>
+ warn(s"Exceeded timeout($shutdownTimeout ms) to wait the exec-pool shutdown gracefully",
+ e)
+ }
+ }
}
private def startTimeoutChecker(): Unit = {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/SignalRegister.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/SignalRegister.scala
index 75299b910..5bbfc2302 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/SignalRegister.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/SignalRegister.scala
@@ -33,13 +33,14 @@ object SignalRegister extends Logging {
def registerLogger(log: Logger): Unit = {
Seq("TERM", "HUP", "INT").foreach { sig =>
if (SystemUtils.IS_OS_UNIX) {
+ val signal = new Signal(sig)
try {
val handler = handlers.getOrElseUpdate(sig, {
info(s"Registering signal handler for $sig")
- ActionHandler(new Signal(sig))
+ ActionHandler(signal)
})
handler.register({
- log.error("RECEIVED SIGNAL " + sig)
+ log.error(s"RECEIVED SIGNAL ${signal.getNumber}: " + sig)
false
})
} catch {
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
index 4c981b67d..11f10fc91 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
@@ -38,4 +38,6 @@ class NoopSessionManager extends SessionManager("noop") {
setSession(session.handle, session)
session.handle
}
+
+ override protected def isServer: Boolean = true
}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index ef66d477b..58c449f17 100644
--- a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.operation
+import java.util.concurrent.RejectedExecutionException
+
import org.apache.hive.service.rpc.thrift.{TCLIService, TExecuteStatementReq, TGetOperationStatusReq, TSessionHandle}
import org.apache.hive.service.rpc.thrift.TOperationState._
@@ -32,6 +34,8 @@ class ExecuteStatement(
extends KyuubiOperation(
OperationType.EXECUTE_STATEMENT, session, client, remoteSessionHandle) {
+ private lazy val statusReq = new TGetOperationStatusReq(_remoteOpHandle)
+
override def beforeRun(): Unit = {
setHasResultSet(true)
setState(OperationState.PENDING)
@@ -39,35 +43,82 @@ class ExecuteStatement(
override protected def afterRun(): Unit = {}
- override protected def runInternal(): Unit = {
+ private def executeStatement(): Unit = {
try {
val req = new TExecuteStatementReq(remoteSessionHandle, statement)
req.setRunAsync(shouldRunAsync)
val resp = client.ExecuteStatement(req)
verifyTStatus(resp.getStatus)
_remoteOpHandle = resp.getOperationHandle
- if (shouldRunAsync) {
- // TODO we should do this asynchronous too, not just block this
- var isComplete = false
- val statusReq = new TGetOperationStatusReq(_remoteOpHandle)
- var statusResp = client.GetOperationStatus(statusReq)
- while(!isComplete) {
- verifyTStatus(statusResp.getStatus)
- statusResp.getOperationState match {
- case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE =>
- statusResp = client.GetOperationStatus(statusReq)
- case state @ (FINISHED_STATE | CLOSED_STATE | CANCELED_STATE | TIMEDOUT_STATE) =>
- isComplete = true
- setState(state)
- case ERROR_STATE =>
- throw KyuubiSQLException(statusResp.getErrorMessage)
- case _ =>
- throw KyuubiSQLException(s"UNKNOWN STATE for $statement")
- }
- }
- } else {
- setState(OperationState.FINISHED)
- }
} catch onError()
}
+
+ private def waitStatementComplete(): Unit = {
+ setState(OperationState.RUNNING)
+ var statusResp = client.GetOperationStatus(statusReq)
+ var isComplete = false
+ while (!isComplete) {
+ verifyTStatus(statusResp.getStatus)
+ val remoteState = statusResp.getOperationState
+ info(s"Query[$statementId] in ${remoteState.name()}")
+ remoteState match {
+ case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE =>
+ statusResp = client.GetOperationStatus(statusReq)
+
+ case FINISHED_STATE =>
+ setState(OperationState.FINISHED)
+ isComplete = true
+
+ case CLOSED_STATE =>
+ setState(OperationState.CLOSED)
+ isComplete = true
+
+ case CANCELED_STATE =>
+ setState(OperationState.CANCELED)
+ isComplete = true
+
+ case TIMEDOUT_STATE =>
+ setState(OperationState.TIMEOUT)
+ isComplete = true
+
+ case ERROR_STATE =>
+ setState(OperationState.ERROR)
+ val ke = KyuubiSQLException(statusResp.getErrorMessage)
+ setOperationException(ke)
+ throw ke
+
+ case UKNOWN_STATE =>
+ setState(OperationState.ERROR)
+ val ke = KyuubiSQLException(s"UNKNOWN STATE for $statement")
+ setOperationException(ke)
+ throw ke
+ }
+ }
+ }
+
+ override protected def runInternal(): Unit = {
+ if (shouldRunAsync) {
+ executeStatement()
+ val sessionManager = session.sessionManager
+ val asyncOperation = new Runnable {
+ override def run(): Unit = waitStatementComplete()
+ }
+ try {
+ val backgroundOperation =
+ sessionManager.submitBackgroundOperation(asyncOperation)
+ setBackgroundHandle(backgroundOperation)
+ } catch {
+ case rejected: RejectedExecutionException =>
+ setState(OperationState.ERROR)
+ val ke = KyuubiSQLException("Error submitting query in background, query rejected",
+ rejected)
+ setOperationException(ke)
+ throw ke
+ }
+ } else {
+ setState(OperationState.RUNNING)
+ executeStatement()
+ setState(OperationState.FINISHED)
+ }
+ }
}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index 6bb75bedd..1859c1930 100644
--- a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -120,6 +120,7 @@ abstract class KyuubiOperation(
val req = new TFetchResultsReq(
_remoteOpHandle, FetchOrientation.toTFetchOrientation(order), rowSetSize)
val resp = client.FetchResults(req)
+ verifyTStatus(resp.getStatus)
resp.getResults
}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index ce84382f2..15d8f714f 100644
--- a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -149,18 +149,19 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation]
val client = getThriftClient(operation.getSession.handle)
+ // TODO:(kentyao): In async mode, if we query log like below, will put very heavy load on engine
+ // side and get thrift err like:
+ // org.apache.thrift.transport.TTransportException: Read a negative frame size (-2147418110)!
val tOperationHandle = operation.remoteOpHandle()
if (tOperationHandle == null) {
new TRowSet(0, new java.util.ArrayList[TRow](0))
} else {
- val req = new TFetchResultsReq()
- req.setOperationHandle(tOperationHandle)
- req.setOrientation(FetchOrientation.toTFetchOrientation(order))
- req.setMaxRows(maxRows)
- req.setFetchType(1)
- val resp = client.FetchResults(req)
- resp.getResults
+// val orientation = FetchOrientation.toTFetchOrientation(order)
+// val req = new TFetchResultsReq(tOperationHandle, orientation, maxRows)
+// req.setFetchType(1.toShort)
+// val resp = client.FetchResults(req)
+// resp.getResults
+ new TRowSet(0, new java.util.ArrayList[TRow](0))
}
-
}
}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index e77f8d66b..270a6f9a7 100644
--- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -17,8 +17,6 @@
package org.apache.kyuubi.session
-import scala.util.control.NonFatal
-
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.KyuubiSQLException
@@ -68,4 +66,6 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
e)
}
}
+
+ override protected def isServer: Boolean = true
}
diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationSuite.scala
index ccdb70313..b041b1fa8 100644
--- a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationSuite.scala
+++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationSuite.scala
@@ -22,8 +22,17 @@ import org.apache.kyuubi.server.KyuubiServer
class KyuubiOperationSuite extends JDBCTests {
- private val conf = KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 0)
+ private val conf = KyuubiConf()
+ .set(KyuubiConf.FRONTEND_BIND_PORT, 0)
+ .set(KyuubiConf.ENGINE_CHECK_INTERVAL, 4000L)
+ .set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 10000L)
+
private val server: KyuubiServer = KyuubiServer.startServer(conf)
+ override def afterAll(): Unit = {
+ server.stop()
+ super.afterAll()
+ }
+
override protected def jdbcUrl: String = s"jdbc:hive2://${server.connectionUrl}/;"
}