From fe31c3b62ffc1d65aad4f2816b616a9d64cc38c8 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 7 Dec 2021 18:22:41 +0800 Subject: [PATCH] [KYUUBI #1489] Support operation log for ExecuteScala ### _Why are the changes needed?_ ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1518 from yaooqinn/1498. Closes #1489 890d1561 [Kent Yao] [KYUUBI #1489] Support operation log for ExecuteScala Authored-by: Kent Yao Signed-off-by: ulysses-you --- .../apache/kyuubi/engine/spark/operation/ExecuteScala.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala index 0278cc6e5..d14f7390e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala @@ -26,6 +26,7 @@ import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.engine.spark.ArrayFetchIterator import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop import org.apache.kyuubi.operation.OperationType +import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session /** @@ -45,6 +46,9 @@ class ExecuteScala( override val statement: String) extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) { + private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle) + override def getOperationLog: Option[OperationLog] = Option(operationLog) + override protected def resultSchema: StructType = { if (result == null || result.schema.isEmpty) { new StructType().add("output", "string") @@ -55,6 +59,7 @@ class ExecuteScala( override protected def runInternal(): Unit = { try { + OperationLog.setCurrentOperationLog(operationLog) spark.sparkContext.setJobGroup(statementId, statement) Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader) repl.interpretWithRedirectOutError(statement) match {