From da2401c171536fbbc564ae331eef07f972171e8e Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Wed, 16 Oct 2024 10:36:45 +0800 Subject: [PATCH] [KYUUBI #6688] [SPARK] Avoid trigger execution when getting result schema MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— `DataFrame.isEmpty` may trigger execution again, we should avoid it. ## Describe Your Solution ๐Ÿ”ง ## Types of changes :bookmark: - [X] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐Ÿ“ - [X] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6688 from wForget/planonly_schema. Closes #6688 265f0ec26 [wforget] fix style d71cc4aa9 [wforget] refactor resultSchema for spark operation 0c36b3d25 [wforget] Avoid trigger execution when getting result schema Authored-by: wforget <643348094@qq.com> Signed-off-by: Bowen Liang --- .../kyuubi/engine/spark/operation/ExecutePython.scala | 4 ++-- .../kyuubi/engine/spark/operation/ExecuteScala.scala | 4 ++-- .../engine/spark/operation/ExecuteStatement.scala | 8 -------- .../engine/spark/operation/PlanOnlyStatement.scala | 6 +++--- .../engine/spark/operation/SetCurrentCatalog.scala | 6 ------ .../engine/spark/operation/SetCurrentDatabase.scala | 6 ------ .../kyuubi/engine/spark/operation/SparkOperation.scala | 10 +++++++++- 7 files changed, 16 insertions(+), 28 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala index 771bb65ee..a3e090d23 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala @@ -59,14 +59,14 @@ class ExecutePython( override protected def supportProgress: Boolean = true override protected def resultSchema: StructType = { - if (result == null || result.schema.isEmpty) { + if (result == null) { new StructType().add("output", "string") .add("status", "string") .add("ename", "string") .add("evalue", "string") .add("traceback", "array") } else { - result.schema + super.resultSchema } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala index 092e6e824..e8335e549 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala @@ -61,10 +61,10 @@ class ExecuteScala( override protected def supportProgress: Boolean = true override protected def resultSchema: StructType = { - if (result == null || result.schema.isEmpty) { + if (result == null) { new StructType().add("output", "string") } else { - result.schema + super.resultSchema } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index ca60de38f..53f02b2fb 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -25,7 +25,6 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path import org.apache.spark.sql.DataFrame import org.apache.spark.sql.kyuubi.SparkDatasetHelper._ -import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT_CANCEL_JOB_GROUP, OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE} @@ -50,13 +49,6 @@ class ExecuteStatement( private var fetchOrcStatement: Option[FetchOrcStatement] = None private var saveFilePath: Option[Path] = None - override protected def resultSchema: StructType = { - if (result == null || result.schema.isEmpty) { - new StructType().add("Result", "string") - } else { - result.schema - } - } override protected def beforeRun(): Unit = { OperationLog.setCurrentOperationLog(operationLog) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala index 42837120e..3c8d670c0 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala @@ -59,9 +59,9 @@ class PlanOnlyStatement( override protected def resultSchema: StructType = { if (result == null) { new StructType().add("plan", "string") - } else if (result.isEmpty) { - new StructType().add("result", "string") - } else result.schema + } else { + super.resultSchema + } } override protected def beforeRun(): Unit = { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala index 88105b086..123dac66d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala @@ -17,8 +17,6 @@ package org.apache.kyuubi.engine.spark.operation -import org.apache.spark.sql.types.StructType - import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -29,10 +27,6 @@ class SetCurrentCatalog(session: Session, catalog: String) extends SparkOperatio override def getOperationLog: Option[OperationLog] = Option(operationLog) - override protected def resultSchema: StructType = { - new StructType() - } - override protected def runInternal(): Unit = { try { SparkCatalogUtils.setCurrentCatalog(spark, catalog) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala index d227f5fd2..170be4dcb 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala @@ -17,8 +17,6 @@ package org.apache.kyuubi.engine.spark.operation -import org.apache.spark.sql.types.StructType - import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -29,10 +27,6 @@ class SetCurrentDatabase(session: Session, database: String) override def getOperationLog: Option[OperationLog] = Option(operationLog) - override protected def resultSchema: StructType = { - new StructType() - } - override protected def runInternal(): Unit = { try { spark.sessionState.catalogManager.setCurrentNamespace(Array(database)) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index b72447ae5..da01e85a6 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -60,7 +60,15 @@ abstract class SparkOperation(session: Session) protected var result: DataFrame = _ - protected def resultSchema: StructType + protected def resultSchema: StructType = { + if (!hasResultSet) { + new StructType() + } else if (result == null || result.schema.isEmpty) { + new StructType().add("Result", "string") + } else { + result.schema + } + } override def redactedStatement: String = redact(spark.sessionState.conf.stringRedactionPattern, statement)