[KYUUBI #6688] [SPARK] Avoid trigger execution when getting result schema

# 🔍 Description
## Issue References 🔗

`DataFrame.isEmpty` may trigger execution again, we should avoid it.

## Describe Your Solution 🔧

## Types of changes 🔖

- [X] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [X] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6688 from wForget/planonly_schema.

Closes #6688

265f0ec26 [wforget] fix style
d71cc4aa9 [wforget] refactor resultSchema for spark operation
0c36b3d25 [wforget] Avoid trigger execution when getting result schema

Authored-by: wforget <643348094@qq.com>
Signed-off-by: Bowen Liang <liangbowen@gf.com.cn>
This commit is contained in:
wforget 2024-10-16 10:36:45 +08:00 committed by Bowen Liang
parent 9c105da117
commit da2401c171
7 changed files with 16 additions and 28 deletions

View File

@ -59,14 +59,14 @@ class ExecutePython(
override protected def supportProgress: Boolean = true
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
if (result == null) {
new StructType().add("output", "string")
.add("status", "string")
.add("ename", "string")
.add("evalue", "string")
.add("traceback", "array<string>")
} else {
result.schema
super.resultSchema
}
}

View File

@ -61,10 +61,10 @@ class ExecuteScala(
override protected def supportProgress: Boolean = true
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
if (result == null) {
new StructType().add("output", "string")
} else {
result.schema
super.resultSchema
}
}

View File

@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT_CANCEL_JOB_GROUP, OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
@ -50,13 +49,6 @@ class ExecuteStatement(
private var fetchOrcStatement: Option[FetchOrcStatement] = None
private var saveFilePath: Option[Path] = None
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
new StructType().add("Result", "string")
} else {
result.schema
}
}
override protected def beforeRun(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)

View File

@ -59,9 +59,9 @@ class PlanOnlyStatement(
override protected def resultSchema: StructType = {
if (result == null) {
new StructType().add("plan", "string")
} else if (result.isEmpty) {
new StructType().add("result", "string")
} else result.schema
} else {
super.resultSchema
}
}
override protected def beforeRun(): Unit = {

View File

@ -17,8 +17,6 @@
package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@ -29,10 +27,6 @@ class SetCurrentCatalog(session: Session, catalog: String) extends SparkOperatio
override def getOperationLog: Option[OperationLog] = Option(operationLog)
override protected def resultSchema: StructType = {
new StructType()
}
override protected def runInternal(): Unit = {
try {
SparkCatalogUtils.setCurrentCatalog(spark, catalog)

View File

@ -17,8 +17,6 @@
package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@ -29,10 +27,6 @@ class SetCurrentDatabase(session: Session, database: String)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
override protected def resultSchema: StructType = {
new StructType()
}
override protected def runInternal(): Unit = {
try {
spark.sessionState.catalogManager.setCurrentNamespace(Array(database))

View File

@ -60,7 +60,15 @@ abstract class SparkOperation(session: Session)
protected var result: DataFrame = _
protected def resultSchema: StructType
protected def resultSchema: StructType = {
if (!hasResultSet) {
new StructType()
} else if (result == null || result.schema.isEmpty) {
new StructType().add("Result", "string")
} else {
result.schema
}
}
override def redactedStatement: String =
redact(spark.sessionState.conf.stringRedactionPattern, statement)