[KYUUBI #6377] Fix isCommand check and set min rows threshold for saveToFile
# 🔍 Description ## Issue References 🔗 This pull request fixes # I found that, with saveToFile enabled with the default min size threshold, even I run a simple `set` command, It also save the result to file. <img width="1718" alt="image" src="https://github.com/apache/kyuubi/assets/6757692/5bcc0da1-201a-453a-8568-d1bfadd7adef"> I think we need to skip this kind of queries. ## Describe Your Solution 🔧 Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests --- # Checklist 📝 - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6377 from turboFei/check_is_DQL. Closes #6377 da9c2a921 [Wang, Fei] ut 04e20db5f [Wang, Fei] conf 8f20ed84b [Wang, Fei] refine the check f558dcca5 [Wang, Fei] ut c81340333 [Wang, Fei] DQL Authored-by: Wang, Fei <fwang12@ebay.com> Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
parent
12c5568c9b
commit
3439ea03f2
@ -420,6 +420,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
|
||||
| kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 |
|
||||
| kyuubi.operation.result.saveToFile.dir | /tmp/kyuubi/tmp_kyuubi_result | The Spark query result save dir, it should be a public accessible to every engine. Results are saved in ORC format, and the directory structure is `/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`. Each query result will delete when query finished. | string | 1.9.0 |
|
||||
| kyuubi.operation.result.saveToFile.enabled | false | The switch for Spark query result save to file. | boolean | 1.9.0 |
|
||||
| kyuubi.operation.result.saveToFile.minRows | 10000 | The minRows of Spark result save to file, default value is 10000. | long | 1.9.1 |
|
||||
| kyuubi.operation.result.saveToFile.minSize | 209715200 | The minSize of Spark result save to file, default value is 200 MB.we use spark's `EstimationUtils#getSizePerRowestimate` to estimate the output size of the execution plan. | long | 1.9.0 |
|
||||
| kyuubi.operation.scheduler.pool | <undefined> | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 |
|
||||
| kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 |
|
||||
|
||||
@ -28,7 +28,7 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
import org.apache.kyuubi.{KyuubiSQLException, Logging}
|
||||
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
|
||||
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
|
||||
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
|
||||
import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager}
|
||||
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState}
|
||||
@ -172,10 +172,12 @@ class ExecuteStatement(
|
||||
})
|
||||
} else {
|
||||
val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
|
||||
val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
|
||||
val resultSaveSizeThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
|
||||
val resultSaveRowsThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, spark)
|
||||
if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs(
|
||||
resultMaxRows,
|
||||
resultSaveThreshold,
|
||||
resultSaveSizeThreshold,
|
||||
resultSaveRowsThreshold,
|
||||
result)) {
|
||||
saveFileName =
|
||||
Some(
|
||||
|
||||
@ -246,22 +246,33 @@ object SparkDatasetHelper extends Logging {
|
||||
case _ => None
|
||||
}
|
||||
|
||||
def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result: DataFrame): Boolean = {
|
||||
if (isCommandExec(result.queryExecution.executedPlan.nodeName)) {
|
||||
def shouldSaveResultToFs(
|
||||
resultMaxRows: Int,
|
||||
minSize: Long,
|
||||
minRows: Long,
|
||||
result: DataFrame): Boolean = {
|
||||
if (isCommandExec(result) ||
|
||||
(resultMaxRows > 0 && resultMaxRows < minRows) ||
|
||||
result.queryExecution.optimizedPlan.stats.rowCount.getOrElse(
|
||||
BigInt(Long.MaxValue)) < minRows) {
|
||||
return false
|
||||
}
|
||||
val finalLimit = optimizedPlanLimit(result.queryExecution) match {
|
||||
case Some(limit) if resultMaxRows > 0 => math.min(limit, resultMaxRows)
|
||||
case Some(limit) => limit
|
||||
case None => resultMaxRows
|
||||
val finalLimit: Option[Long] = optimizedPlanLimit(result.queryExecution) match {
|
||||
case Some(limit) if resultMaxRows > 0 => Some(math.min(limit, resultMaxRows))
|
||||
case Some(limit) => Some(limit)
|
||||
case None if resultMaxRows > 0 => Some(resultMaxRows)
|
||||
case _ => None
|
||||
}
|
||||
lazy val stats = if (finalLimit > 0) {
|
||||
finalLimit * EstimationUtils.getSizePerRow(
|
||||
result.queryExecution.executedPlan.output)
|
||||
} else {
|
||||
result.queryExecution.optimizedPlan.stats.sizeInBytes
|
||||
if (finalLimit.exists(_ < minRows)) {
|
||||
return false
|
||||
}
|
||||
lazy val colSize =
|
||||
val sizeInBytes = result.queryExecution.optimizedPlan.stats.sizeInBytes
|
||||
val stats = finalLimit.map { limit =>
|
||||
val estimateSize =
|
||||
limit * EstimationUtils.getSizePerRow(result.queryExecution.executedPlan.output)
|
||||
estimateSize min sizeInBytes
|
||||
}.getOrElse(sizeInBytes)
|
||||
val colSize =
|
||||
if (result == null || result.schema.isEmpty) {
|
||||
0
|
||||
} else {
|
||||
@ -270,7 +281,8 @@ object SparkDatasetHelper extends Logging {
|
||||
minSize > 0 && colSize > 0 && stats >= minSize
|
||||
}
|
||||
|
||||
private def isCommandExec(nodeName: String): Boolean = {
|
||||
def isCommandExec(result: DataFrame): Boolean = {
|
||||
val nodeName = result.queryExecution.executedPlan.getClass.getName
|
||||
nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec" ||
|
||||
nodeName == "org.apache.spark.sql.execution.CommandResultExec"
|
||||
}
|
||||
|
||||
@ -42,4 +42,15 @@ class SparkDatasetHelperSuite extends WithSparkSQLEngine {
|
||||
spark.sql(collectLimitStatement).queryExecution) === Option(topKThreshold))
|
||||
}
|
||||
}
|
||||
|
||||
test("isCommandExec") {
|
||||
var query = "set"
|
||||
assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
|
||||
query = "explain set"
|
||||
assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
|
||||
query = "show tables"
|
||||
assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
|
||||
query = "select * from VALUES(1),(2),(3),(4) AS t(id)"
|
||||
assert(!SparkDatasetHelper.isCommandExec(spark.sql(query)))
|
||||
}
|
||||
}
|
||||
|
||||
@ -2066,6 +2066,14 @@ object KyuubiConf {
|
||||
.checkValue(_ > 0, "must be positive value")
|
||||
.createWithDefault(200 * 1024 * 1024)
|
||||
|
||||
val OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS: ConfigEntry[Long] =
|
||||
buildConf("kyuubi.operation.result.saveToFile.minRows")
|
||||
.doc("The minRows of Spark result save to file, default value is 10000.")
|
||||
.version("1.9.1")
|
||||
.longConf
|
||||
.checkValue(_ > 0, "must be positive value")
|
||||
.createWithDefault(10000)
|
||||
|
||||
val OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
|
||||
buildConf("kyuubi.operation.incremental.collect")
|
||||
.internal
|
||||
|
||||
Loading…
Reference in New Issue
Block a user