Support ansynchronous exec for Kyuubi Server

This commit is contained in:
Kent Yao 2020-11-24 10:55:29 +08:00
parent 9ca23bcf20
commit f8ef7c8997
20 changed files with 268 additions and 136 deletions

View File

@ -24,6 +24,7 @@
#
# spark.master local
# spark.ui.enabled false
# spark.driver.extraJavaOptions -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
## Hadoop Configurations, they will override those in $HADOOP_CONF_DIR
#

View File

@ -90,6 +90,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
#
# spark.master local
# spark.ui.enabled false
# spark.driver.extraJavaOptions -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
## Hadoop Configurations, they will override those in $HADOOP_CONF_DIR
#
@ -106,6 +107,19 @@ kyuubi\.authentication<br>\.ldap\.domain|<div style='width: 80pt;word-wrap: brea
kyuubi\.authentication<br>\.ldap\.url|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>SPACE character separated LDAP connection URL(s).</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.authentication<br>\.sasl\.qop|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>auth</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Sasl QOP enable higher levels of protection for Kyuubi communication with clients.<ul> <li>auth - authentication only (default)</li> <li>auth-int - authentication plus integrity protection</li> <li>auth-conf - authentication plus integrity and confidentiality protection. This is applicable only if Kyuubi is configured to use Kerberos authentication.</li> </ul></div>|<div style='width: 20pt'>1.0.0</div>
### Backend
Key | Default | Meaning | Since
--- | --- | --- | ---
kyuubi\.backend\.engine<br>\.exec\.pool\.keepalive<br>\.time|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in SQL engine applications</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.backend\.engine<br>\.exec\.pool\.shutdown<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT10S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Timeout(ms) for the operation execution thread pool to terminate in SQL engine applications</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.backend\.engine<br>\.exec\.pool\.size|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>100</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Number of threads in the operation execution thread pool of SQL engine applications</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.backend\.engine<br>\.exec\.pool\.wait\.queue<br>\.size|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>100</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Size of the wait queue for the operation execution thread pool in SQL engine applications</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.backend\.server<br>\.exec\.pool\.keepalive<br>\.time|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in Kyuubi server</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.backend\.server<br>\.exec\.pool\.shutdown<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT10S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Timeout(ms) for the operation execution thread pool to terminate in Kyuubi server</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.backend\.server<br>\.exec\.pool\.size|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>100</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Number of threads in the operation execution thread pool of Kyuubi server</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.backend\.server<br>\.exec\.pool\.wait\.queue<br>\.size|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>100</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Size of the wait queue for the operation execution thread pool of Kyuubi server</div>|<div style='width: 20pt'>1.0.0</div>
### Delegation
Key | Default | Meaning | Since
@ -156,21 +170,17 @@ kyuubi\.kinit<br>\.principal|<div style='width: 80pt;word-wrap: break-word;white
Key | Default | Meaning | Since
--- | --- | --- | ---
kyuubi\.operation\.idle<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT3H</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Operation will be closed when it's not accessed for this duration of time</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.operation<br>\.status\.polling<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT5S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Timeout(ms) for long polling asynchronous running sql query's status</div>|<div style='width: 20pt'>1.0.0</div>
### Session
Key | Default | Meaning | Since
--- | --- | --- | ---
kyuubi\.session\.check<br>\.interval|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT5M</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>The check interval for session timeout.</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.backend\.pool<br>\.keepalive\.time|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.backend\.pool\.shutdown<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT10S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Timeout(ms) for the operation execution thread pool to terminate</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.backend\.pool\.size|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>100</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Number of threads in the operation execution thread pool for SQL engine applications</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.backend\.pool\.wait<br>\.queue\.size|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>100</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Size of the wait queue for the operation execution thread pool in SQL engine applications</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.check\.interval|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT10M</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>The check interval for engine timeout</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.idle\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>engine timeout, the engine will self-terminate when it's not accessed for this duration</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.initialize\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Timeout for starting the background engine, e.g. SparkSQLEngine.</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.login\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT15S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>The timeout(ms) of creating the connection to remote sql query engine</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.long\.polling\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT5S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Timeout(ms) for long polling asynchronous running sql query's status</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.spark\.main\.resource|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT6H</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>session timeout, it will be closed when it's not accessed for this duration</div>|<div style='width: 20pt'>1.0.0</div>

View File

@ -9,6 +9,8 @@
You can use the [Java Debug Wire Protocol](https://docs.oracle.com/javase/8/docs/technotes/guides/jpda/conninv.html#Plugin) to debug Kyuubi
w/ your favorite IDE tool, e.g. Intellij IDEA.
## Debugging Server
We can configure the JDWP agent in `KYUUBI_JAVA_OPTS` for debugging.
@ -24,3 +26,16 @@ In the IDE, you set the corresponding parameters(host&port) in debug configurati
![](../imgs/idea_debug.png)
</div>
## Debugging Apps
- Spark Driver
```bash
spark.driver.extraJavaOptions -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
```
- Spark Executor
```bash
spark.executor.extraJavaOptions -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
```

View File

@ -17,16 +17,10 @@
package org.apache.kyuubi.engine.spark
import java.util.concurrent.{ExecutionException, TimeoutException, TimeUnit}
import scala.concurrent.CancellationException
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.operation.ExecuteStatement
import org.apache.kyuubi.engine.spark.session.SparkSQLSessionManager
import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationStatus}
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.service.AbstractBackendService
import org.apache.kyuubi.service.BackendService
import org.apache.kyuubi.session.Session
@ -42,29 +36,7 @@ import org.apache.kyuubi.session.SessionManager
class SparkSQLBackendService(name: String, spark: SparkSession)
extends AbstractBackendService(name) {
private lazy val timeout = conf.get(KyuubiConf.ENGINE_LONG_POLLING_TIMEOUT)
def this(spark: SparkSession) = this(classOf[SparkSQLBackendService].getSimpleName, spark)
override val sessionManager: SessionManager = new SparkSQLSessionManager(spark)
override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = {
val operation = sessionManager.operationManager.getOperation(operationHandle)
operation match {
case es: ExecuteStatement if es.shouldRunAsync =>
try {
es.backgroundHandle.get(timeout, TimeUnit.MILLISECONDS)
} catch {
case e: TimeoutException =>
debug(s"$operationHandle: Long polling timed out, ${e.getMessage}")
case e: CancellationException =>
debug(s"$operationHandle: The background operation was cancelled, ${e.getMessage}")
case e: ExecutionException =>
debug(s"$operationHandle: The background operation was aborted, ${e.getMessage}")
case _: InterruptedException =>
}
case _ =>
}
super.getOperationStatus(operationHandle)
}
}

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.spark.operation
import java.util.concurrent.{Future, RejectedExecutionException}
import java.util.concurrent.RejectedExecutionException
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.StructType
@ -25,7 +25,6 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.engine.spark.operation.log.OperationLog
import org.apache.kyuubi.engine.spark.session.SparkSQLSessionManager
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.session.Session
@ -37,8 +36,6 @@ class ExecuteStatement(
extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) with Logging {
private var result: DataFrame = _
@volatile private var _backgroundHandle: Future[_] = _
def backgroundHandle: Future[_] = _backgroundHandle
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
@ -49,6 +46,7 @@ class ExecuteStatement(
}
override protected def beforeRun(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)
setState(OperationState.PENDING)
setHasResultSet(true)
}
@ -59,7 +57,6 @@ class ExecuteStatement(
private def executeStatement(): Unit = {
try {
OperationLog.setCurrentOperationLog(operationLog)
setState(OperationState.RUNNING)
info(KyuubiSparkUtil.diagnostics(spark))
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
@ -78,13 +75,16 @@ class ExecuteStatement(
override protected def runInternal(): Unit = {
if (shouldRunAsync) {
val asyncOperation = new Runnable {
override def run(): Unit = executeStatement()
override def run(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)
executeStatement()
}
}
try {
val sparkSQLSessionManager = session.sessionManager.asInstanceOf[SparkSQLSessionManager]
val future = sparkSQLSessionManager.submitBackgroundOperation(asyncOperation)
_backgroundHandle = future
val sparkSQLSessionManager = session.sessionManager
val backgroundHandle = sparkSQLSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)

View File

@ -132,11 +132,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = {
val log = getOperation(opHandle).asInstanceOf[SparkOperation].getOperationLog
if (log == null) {
throw KyuubiSQLException(s"Couldn't find log associated with $opHandle")
}
log.read(maxRows)
}
}

View File

@ -123,7 +123,7 @@ class OperationLog(path: Path) extends Logging {
throw new KyuubiSQLException(s"Operation[$opHandle] log file $absPath is not found", e)
}
val tColumn = TColumn.stringVal(new TStringColumn(logs, ByteBuffer.allocate(0)))
val tRow = new TRowSet(0, new java.util.ArrayList[TRow](0))
val tRow = new TRowSet(0, new java.util.ArrayList[TRow](logs.size()))
tRow.addToColumns(tColumn)
tRow
}

View File

@ -17,17 +17,12 @@
package org.apache.kyuubi.engine.spark.session
import java.util.concurrent.{Future, ThreadPoolExecutor, TimeUnit}
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.session._
import org.apache.kyuubi.util.ThreadUtils
/**
* A [[SessionManager]] constructed with [[SparkSession]] which give it the ability to talk with
@ -44,34 +39,9 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
val operationManager = new SparkSQLOperationManager()
private var execPool: ThreadPoolExecutor = _
@volatile private var _latestLogoutTime: Long = Long.MaxValue
def latestLogoutTime: Long = _latestLogoutTime
override def initialize(conf: KyuubiConf): Unit = {
val poolSize = conf.get(ENGINE_EXEC_POOL_SIZE)
val waitQueueSize = conf.get(ENGINE_EXEC_WAIT_QUEUE_SIZE)
val keepAliveMs = conf.get(ENGINE_EXEC_KEEPALIVE_TIME)
execPool = ThreadUtils.newDaemonQueuedThreadPool(
poolSize, waitQueueSize, keepAliveMs, s"$name-exec-pool")
super.initialize(conf)
}
override def stop(): Unit = {
if (execPool != null) {
execPool.shutdown()
val timeout = conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
try {
execPool.awaitTermination(timeout, TimeUnit.SECONDS)
} catch {
case e: InterruptedException =>
warn(s"Exceeded timeout($timeout ms) to wait the exec-pool shutdown gracefully", e)
}
}
super.stop()
}
override def openSession(
protocol: TProtocolVersion,
user: String,
@ -108,8 +78,6 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
operationManager.removeSparkSession(sessionHandle)
}
def submitBackgroundOperation(r: Runnable): Future[_] = execPool.submit(r)
private def setModifiableConfig(spark: SparkSession, key: String, value: String): Unit = {
if (spark.conf.isModifiable(key)) {
spark.conf.set(key, value)
@ -117,4 +85,6 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
warn(s"Spark config $key is static and will be ignored")
}
}
override protected def isServer: Boolean = false
}

View File

@ -150,7 +150,7 @@ object KyuubiConf {
}
def buildConf(key: String): ConfigBuilder = {
new ConfigBuilder("kyuubi." + key).onCreate(register)
ConfigBuilder("kyuubi." + key).onCreate(register)
}
val EMBEDDED_ZK_PORT: ConfigEntry[Int] = buildConf("zookeeper.embedded.port")
@ -362,14 +362,14 @@ object KyuubiConf {
.doc("session timeout, it will be closed when it's not accessed for this duration")
.version("1.0.0")
.timeConf
.checkValue(_ > Duration.ofSeconds(3).toMillis, "Minimum 3 seconds")
.checkValue(_ >= Duration.ofSeconds(3).toMillis, "Minimum 3 seconds")
.createWithDefault(Duration.ofHours(6).toMillis)
val ENGINE_CHECK_INTERVAL: ConfigEntry[Long] = buildConf("session.engine.check.interval")
.doc("The check interval for engine timeout")
.version("1.0.0")
.timeConf
.checkValue(_ > Duration.ofSeconds(3).toMillis, "Minimum 3 seconds")
.checkValue(_ >= Duration.ofSeconds(3).toMillis, "Minimum 3 seconds")
.createWithDefault(Duration.ofMinutes(10).toMillis)
val ENGINE_IDLE_TIMEOUT: ConfigEntry[Long] = buildConf("session.engine.idle.timeout")
@ -378,38 +378,68 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofMinutes(30L).toMillis)
val SERVER_EXEC_POOL_SIZE: ConfigEntry[Int] =
buildConf("backend.server.exec.pool.size")
.doc("Number of threads in the operation execution thread pool of Kyuubi server")
.version("1.0.0")
.intConf
.createWithDefault(100)
val ENGINE_EXEC_POOL_SIZE: ConfigEntry[Int] =
buildConf("session.engine.backend.pool.size")
.doc("Number of threads in the operation execution thread pool for SQL engine applications")
buildConf("backend.engine.exec.pool.size")
.doc("Number of threads in the operation execution thread pool of SQL engine applications")
.version("1.0.0")
.intConf
.createWithDefault(100)
val SERVER_EXEC_WAIT_QUEUE_SIZE: ConfigEntry[Int] =
buildConf("backend.server.exec.pool.wait.queue.size")
.doc("Size of the wait queue for the operation execution thread pool of Kyuubi server")
.version("1.0.0")
.intConf
.createWithDefault(100)
val ENGINE_EXEC_WAIT_QUEUE_SIZE: ConfigEntry[Int] =
buildConf("session.engine.backend.pool.wait.queue.size")
buildConf("backend.engine.exec.pool.wait.queue.size")
.doc("Size of the wait queue for the operation execution thread pool in SQL engine" +
" applications")
.version("1.0.0")
.intConf
.createWithDefault(100)
val ENGINE_EXEC_KEEPALIVE_TIME: ConfigEntry[Long] =
buildConf("session.engine.backend.pool.keepalive.time")
val SERVER_EXEC_KEEPALIVE_TIME: ConfigEntry[Long] =
buildConf("backend.server.exec.pool.keepalive.time")
.doc("Time(ms) that an idle async thread of the operation execution thread pool will wait" +
" for a new task to arrive before terminating")
" for a new task to arrive before terminating in Kyuubi server")
.version("1.0.0")
.timeConf
.createWithDefault(Duration.ofSeconds(60).toMillis)
val ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
buildConf("session.engine.backend.pool.shutdown.timeout")
.doc("Timeout(ms) for the operation execution thread pool to terminate")
val ENGINE_EXEC_KEEPALIVE_TIME: ConfigEntry[Long] =
buildConf("backend.engine.exec.pool.keepalive.time")
.doc("Time(ms) that an idle async thread of the operation execution thread pool will wait" +
" for a new task to arrive before terminating in SQL engine applications")
.version("1.0.0")
.timeConf
.createWithDefault(Duration.ofSeconds(60).toMillis)
val SERVER_EXEC_POOL_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
buildConf("backend.server.exec.pool.shutdown.timeout")
.doc("Timeout(ms) for the operation execution thread pool to terminate in Kyuubi server")
.version("1.0.0")
.timeConf
.createWithDefault(Duration.ofSeconds(10).toMillis)
val ENGINE_LONG_POLLING_TIMEOUT: ConfigEntry[Long] =
buildConf("session.engine.long.polling.timeout")
val ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
buildConf("backend.engine.exec.pool.shutdown.timeout")
.doc("Timeout(ms) for the operation execution thread pool to terminate in SQL engine" +
" applications")
.version("1.0.0")
.timeConf
.createWithDefault(Duration.ofSeconds(10).toMillis)
val OPERATION_STATUS_POLLING_TIMEOUT: ConfigEntry[Long] =
buildConf("operation.status.polling.timeout")
.doc("Timeout(ms) for long polling asynchronous running sql query's status")
.version("1.0.0")
.timeConf

View File

@ -18,6 +18,7 @@
package org.apache.kyuubi.operation
import java.time.Duration
import java.util.concurrent.Future
import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet, TTableSchema}
@ -46,6 +47,14 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
@volatile protected var operationException: KyuubiSQLException = _
@volatile protected var hasResultSet: Boolean = false
@volatile private var _backgroundHandle: Future[_] = _
protected def setBackgroundHandle(backgroundHandle: Future[_]): Unit = {
_backgroundHandle = backgroundHandle
}
def getBackgroundHandle: Future[_] = _backgroundHandle
protected def statement: String = opType.toString
protected def setHasResultSet(hasResultSet: Boolean): Unit = {

View File

@ -17,6 +17,8 @@
package org.apache.kyuubi.operation
import java.util.concurrent.Future
import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet, TTableSchema}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@ -36,6 +38,7 @@ trait Operation {
def getHandle: OperationHandle
def getStatus: OperationStatus
def getBackgroundHandle: Future[_]
def shouldRunAsync: Boolean
def isTimedOut: Boolean

View File

@ -17,6 +17,10 @@
package org.apache.kyuubi.service
import java.util.concurrent.{ExecutionException, TimeoutException, TimeUnit}
import scala.concurrent.CancellationException
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion, TRowSet, TTableSchema}
import org.apache.kyuubi.config.KyuubiConf
@ -30,6 +34,8 @@ import org.apache.kyuubi.session.SessionHandle
abstract class AbstractBackendService(name: String)
extends CompositeService(name) with BackendService {
private lazy val timeout = conf.get(KyuubiConf.OPERATION_STATUS_POLLING_TIMEOUT)
override def openSession(
protocol: TProtocolVersion,
user: String,
@ -117,10 +123,22 @@ abstract class AbstractBackendService(name: String)
}
override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = {
sessionManager
.operationManager
.getOperation(operationHandle)
.getStatus
val operation = sessionManager.operationManager.getOperation(operationHandle)
if (operation.shouldRunAsync) {
try {
operation.getBackgroundHandle.get(timeout, TimeUnit.MILLISECONDS)
} catch {
case e: TimeoutException =>
debug(s"$operationHandle: Long polling timed out, ${e.getMessage}")
case e: CancellationException =>
debug(s"$operationHandle: The background operation was cancelled, ${e.getMessage}")
case e: ExecutionException =>
debug(s"$operationHandle: The background operation was aborted, ${e.getMessage}")
case _: InterruptedException =>
}
}
operation.getStatus
}
override def cancelOperation(operationHandle: OperationHandle): Unit = {

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.session
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, Future, ThreadPoolExecutor, TimeUnit}
import scala.collection.JavaConverters._
@ -25,6 +25,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.operation.OperationManager
import org.apache.kyuubi.service.CompositeService
import org.apache.kyuubi.util.ThreadUtils
@ -44,6 +45,12 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
private val timeoutChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-timeout-checker")
protected def isServer: Boolean
private var execPool: ThreadPoolExecutor = _
def submitBackgroundOperation(r: Runnable): Future[_] = execPool.submit(r)
def operationManager: OperationManager
def openSession(
@ -77,6 +84,26 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
override def initialize(conf: KyuubiConf): Unit = {
addService(operationManager)
val poolSize: Int = if (isServer) {
conf.get(SERVER_EXEC_POOL_SIZE)
} else {
conf.get(ENGINE_EXEC_POOL_SIZE)
}
val waitQueueSize: Int = if (isServer) {
conf.get(SERVER_EXEC_WAIT_QUEUE_SIZE)
} else {
conf.get(ENGINE_EXEC_WAIT_QUEUE_SIZE)
}
val keepAliveMs: Long = if (isServer) {
conf.get(SERVER_EXEC_KEEPALIVE_TIME)
} else {
conf.get(ENGINE_EXEC_KEEPALIVE_TIME)
}
execPool = ThreadUtils.newDaemonQueuedThreadPool(
poolSize, waitQueueSize, keepAliveMs, s"$name-exec-pool")
super.initialize(conf)
}
@ -88,13 +115,29 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
override def stop(): Unit = {
super.stop()
shutdown = true
val shutdownTimeout: Long = if (isServer) {
conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
} else {
conf.get(SERVER_EXEC_POOL_SHUTDOWN_TIMEOUT)
}
timeoutChecker.shutdown()
try {
timeoutChecker.awaitTermination(10, TimeUnit.SECONDS)
timeoutChecker.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)
} catch {
case i: InterruptedException =>
warn(s"Exceeded to shutdown session timeout checker ", i)
}
if (execPool != null) {
execPool.shutdown()
try {
execPool.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)
} catch {
case e: InterruptedException =>
warn(s"Exceeded timeout($shutdownTimeout ms) to wait the exec-pool shutdown gracefully",
e)
}
}
}
private def startTimeoutChecker(): Unit = {

View File

@ -33,13 +33,14 @@ object SignalRegister extends Logging {
def registerLogger(log: Logger): Unit = {
Seq("TERM", "HUP", "INT").foreach { sig =>
if (SystemUtils.IS_OS_UNIX) {
val signal = new Signal(sig)
try {
val handler = handlers.getOrElseUpdate(sig, {
info(s"Registering signal handler for $sig")
ActionHandler(new Signal(sig))
ActionHandler(signal)
})
handler.register({
log.error("RECEIVED SIGNAL " + sig)
log.error(s"RECEIVED SIGNAL ${signal.getNumber}: " + sig)
false
})
} catch {

View File

@ -38,4 +38,6 @@ class NoopSessionManager extends SessionManager("noop") {
setSession(session.handle, session)
session.handle
}
override protected def isServer: Boolean = true
}

View File

@ -17,6 +17,8 @@
package org.apache.kyuubi.operation
import java.util.concurrent.RejectedExecutionException
import org.apache.hive.service.rpc.thrift.{TCLIService, TExecuteStatementReq, TGetOperationStatusReq, TSessionHandle}
import org.apache.hive.service.rpc.thrift.TOperationState._
@ -32,6 +34,8 @@ class ExecuteStatement(
extends KyuubiOperation(
OperationType.EXECUTE_STATEMENT, session, client, remoteSessionHandle) {
private lazy val statusReq = new TGetOperationStatusReq(_remoteOpHandle)
override def beforeRun(): Unit = {
setHasResultSet(true)
setState(OperationState.PENDING)
@ -39,35 +43,82 @@ class ExecuteStatement(
override protected def afterRun(): Unit = {}
override protected def runInternal(): Unit = {
private def executeStatement(): Unit = {
try {
val req = new TExecuteStatementReq(remoteSessionHandle, statement)
req.setRunAsync(shouldRunAsync)
val resp = client.ExecuteStatement(req)
verifyTStatus(resp.getStatus)
_remoteOpHandle = resp.getOperationHandle
if (shouldRunAsync) {
// TODO we should do this asynchronous too, not just block this
var isComplete = false
val statusReq = new TGetOperationStatusReq(_remoteOpHandle)
var statusResp = client.GetOperationStatus(statusReq)
while(!isComplete) {
verifyTStatus(statusResp.getStatus)
statusResp.getOperationState match {
case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE =>
statusResp = client.GetOperationStatus(statusReq)
case state @ (FINISHED_STATE | CLOSED_STATE | CANCELED_STATE | TIMEDOUT_STATE) =>
isComplete = true
setState(state)
case ERROR_STATE =>
throw KyuubiSQLException(statusResp.getErrorMessage)
case _ =>
throw KyuubiSQLException(s"UNKNOWN STATE for $statement")
}
}
} else {
setState(OperationState.FINISHED)
}
} catch onError()
}
private def waitStatementComplete(): Unit = {
setState(OperationState.RUNNING)
var statusResp = client.GetOperationStatus(statusReq)
var isComplete = false
while (!isComplete) {
verifyTStatus(statusResp.getStatus)
val remoteState = statusResp.getOperationState
info(s"Query[$statementId] in ${remoteState.name()}")
remoteState match {
case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE =>
statusResp = client.GetOperationStatus(statusReq)
case FINISHED_STATE =>
setState(OperationState.FINISHED)
isComplete = true
case CLOSED_STATE =>
setState(OperationState.CLOSED)
isComplete = true
case CANCELED_STATE =>
setState(OperationState.CANCELED)
isComplete = true
case TIMEDOUT_STATE =>
setState(OperationState.TIMEOUT)
isComplete = true
case ERROR_STATE =>
setState(OperationState.ERROR)
val ke = KyuubiSQLException(statusResp.getErrorMessage)
setOperationException(ke)
throw ke
case UKNOWN_STATE =>
setState(OperationState.ERROR)
val ke = KyuubiSQLException(s"UNKNOWN STATE for $statement")
setOperationException(ke)
throw ke
}
}
}
override protected def runInternal(): Unit = {
if (shouldRunAsync) {
executeStatement()
val sessionManager = session.sessionManager
val asyncOperation = new Runnable {
override def run(): Unit = waitStatementComplete()
}
try {
val backgroundOperation =
sessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundOperation)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
val ke = KyuubiSQLException("Error submitting query in background, query rejected",
rejected)
setOperationException(ke)
throw ke
}
} else {
setState(OperationState.RUNNING)
executeStatement()
setState(OperationState.FINISHED)
}
}
}

View File

@ -120,6 +120,7 @@ abstract class KyuubiOperation(
val req = new TFetchResultsReq(
_remoteOpHandle, FetchOrientation.toTFetchOrientation(order), rowSetSize)
val resp = client.FetchResults(req)
verifyTStatus(resp.getStatus)
resp.getResults
}

View File

@ -149,18 +149,19 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation]
val client = getThriftClient(operation.getSession.handle)
// TODO:(kentyao): In async mode, if we query log like below, will put very heavy load on engine
// side and get thrift err like:
// org.apache.thrift.transport.TTransportException: Read a negative frame size (-2147418110)!
val tOperationHandle = operation.remoteOpHandle()
if (tOperationHandle == null) {
new TRowSet(0, new java.util.ArrayList[TRow](0))
} else {
val req = new TFetchResultsReq()
req.setOperationHandle(tOperationHandle)
req.setOrientation(FetchOrientation.toTFetchOrientation(order))
req.setMaxRows(maxRows)
req.setFetchType(1)
val resp = client.FetchResults(req)
resp.getResults
// val orientation = FetchOrientation.toTFetchOrientation(order)
// val req = new TFetchResultsReq(tOperationHandle, orientation, maxRows)
// req.setFetchType(1.toShort)
// val resp = client.FetchResults(req)
// resp.getResults
new TRowSet(0, new java.util.ArrayList[TRow](0))
}
}
}

View File

@ -17,8 +17,6 @@
package org.apache.kyuubi.session
import scala.util.control.NonFatal
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.KyuubiSQLException
@ -68,4 +66,6 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
e)
}
}
override protected def isServer: Boolean = true
}

View File

@ -22,8 +22,17 @@ import org.apache.kyuubi.server.KyuubiServer
class KyuubiOperationSuite extends JDBCTests {
private val conf = KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 0)
private val conf = KyuubiConf()
.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
.set(KyuubiConf.ENGINE_CHECK_INTERVAL, 4000L)
.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 10000L)
private val server: KyuubiServer = KyuubiServer.startServer(conf)
override def afterAll(): Unit = {
server.stop()
super.afterAll()
}
override protected def jdbcUrl: String = s"jdbc:hive2://${server.connectionUrl}/;"
}