[KYUUBI #1489] Support operation log for ExecuteScala
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1518 from yaooqinn/1498. Closes #1489 890d1561 [Kent Yao] [KYUUBI #1489] Support operation log for ExecuteScala Authored-by: Kent Yao <yao@apache.org> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
parent
627d66b171
commit
fe31c3b62f
@ -26,6 +26,7 @@ import org.apache.kyuubi.KyuubiSQLException
|
||||
import org.apache.kyuubi.engine.spark.ArrayFetchIterator
|
||||
import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop
|
||||
import org.apache.kyuubi.operation.OperationType
|
||||
import org.apache.kyuubi.operation.log.OperationLog
|
||||
import org.apache.kyuubi.session.Session
|
||||
|
||||
/**
|
||||
@ -45,6 +46,9 @@ class ExecuteScala(
|
||||
override val statement: String)
|
||||
extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) {
|
||||
|
||||
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
|
||||
override def getOperationLog: Option[OperationLog] = Option(operationLog)
|
||||
|
||||
override protected def resultSchema: StructType = {
|
||||
if (result == null || result.schema.isEmpty) {
|
||||
new StructType().add("output", "string")
|
||||
@ -55,6 +59,7 @@ class ExecuteScala(
|
||||
|
||||
override protected def runInternal(): Unit = {
|
||||
try {
|
||||
OperationLog.setCurrentOperationLog(operationLog)
|
||||
spark.sparkContext.setJobGroup(statementId, statement)
|
||||
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
|
||||
repl.interpretWithRedirectOutError(statement) match {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user