[KYUUBI #3884] Execute scala code supports asynchronous and query timeout

### _Why are the changes needed?_
Execute scala code now does not support async and query timeout.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3884 from cxzl25/async_scala.

Closes #3884

d231f682 [sychen] Execute scala code supports asynchronous and timeout

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
sychen 2022-12-06 08:10:45 +08:00 committed by fwang12
parent 730fd57ccf
commit 2f6242091f
3 changed files with 67 additions and 5 deletions

View File

@ -18,6 +18,7 @@
package org.apache.kyuubi.engine.spark.operation
import java.io.File
import java.util.concurrent.RejectedExecutionException
import scala.reflect.internal.util.ScalaClassLoader.URLClassLoader
import scala.tools.nsc.interpreter.Results.{Error, Incomplete, Success}
@ -28,8 +29,9 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop
import org.apache.kyuubi.operation.ArrayFetchIterator
import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@ -47,7 +49,9 @@ import org.apache.kyuubi.session.Session
class ExecuteScala(
session: Session,
repl: KyuubiSparkILoop,
override val statement: String)
override val statement: String,
override val shouldRunAsync: Boolean,
queryTimeout: Long)
extends SparkOperation(session) {
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
@ -61,9 +65,20 @@ class ExecuteScala(
}
}
override protected def runInternal(): Unit = withLocalProperties {
override protected def beforeRun(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)
setState(OperationState.PENDING)
setHasResultSet(true)
}
override protected def afterRun(): Unit = {
OperationLog.removeCurrentOperationLog()
}
private def executeScala(): Unit = withLocalProperties {
try {
OperationLog.setCurrentOperationLog(operationLog)
setState(OperationState.RUNNING)
info(diagnostics)
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
val legacyOutput = repl.getOutput
if (legacyOutput.nonEmpty) {
@ -104,10 +119,39 @@ class ExecuteScala(
case Incomplete =>
throw KyuubiSQLException(s"Incomplete code:\n$statement")
}
setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
} finally {
repl.clearResult(statementId)
shutdownTimeoutMonitor()
}
}
override protected def runInternal(): Unit = {
addTimeoutMonitor(queryTimeout)
if (shouldRunAsync) {
val asyncOperation = new Runnable {
override def run(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)
executeScala()
}
}
try {
val sparkSQLSessionManager = session.sessionManager
val backgroundHandle = sparkSQLSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
val ke =
KyuubiSQLException("Error submitting scala in background", rejected)
setOperationException(ke)
throw ke
}
} else {
executeScala()
}
}
}

View File

@ -88,7 +88,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
}
case OperationLanguages.SCALA =>
val repl = sessionToRepl.getOrElseUpdate(session.handle, KyuubiSparkILoop(spark))
new ExecuteScala(session, repl, statement)
new ExecuteScala(session, repl, statement, runAsync, queryTimeout)
case OperationLanguages.PYTHON =>
try {
ExecutePython.init()

View File

@ -17,6 +17,7 @@
package org.apache.kyuubi.jdbc
import java.sql.SQLTimeoutException
import java.util.Properties
import org.apache.kyuubi.IcebergSuiteMixin
@ -103,6 +104,23 @@ class KyuubiHiveDriverSuite extends WithSparkSQLEngine with IcebergSuiteMixin {
}
}
test("executeScala support timeout") {
val driver = new KyuubiHiveDriver()
val connection = driver.connect(getJdbcUrl, new Properties())
val statement = connection.createStatement().asInstanceOf[KyuubiStatement]
statement.setQueryTimeout(5)
try {
val code = """java.lang.Thread.sleep(500000L)"""
val e = intercept[SQLTimeoutException] {
statement.executeScala(code)
}.getMessage
assert(e.contains("Query timed out"))
} finally {
statement.close()
connection.close()
}
}
test("wrapable KyuubiConnection") {
val driver = new KyuubiHiveDriver()
val connection = driver.connect(getJdbcUrl, new Properties())