diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala index 89b8f2d4a..b70c0392d 100644 --- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala +++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -88,6 +88,15 @@ object KyuubiSQLConf { .booleanConf .createWithDefault(true) + val ZORDER_GLOBAL_SORT_ENABLED = + buildConf("spark.sql.optimizer.zorderGlobalSort.enabled") + .doc("When true, we do a global sort using zorder. Note that, it can cause data skew " + + "issue if the zorder columns have less cardinality. When false, we only do local sort " + + "using zorder.") + .version("1.4.0") + .booleanConf + .createWithDefault(true) + val WATCHDOG_MAX_HIVEPARTITION = buildConf("spark.sql.watchdog.maxHivePartitions") .doc("Add maxHivePartitions Strategy to avoid scan excessive " + diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala index e76e81ff8..81deea07a 100644 --- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala +++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala @@ -146,7 +146,7 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder { } Sort( SortOrder(orderExpr, Ascending, NullsLast, Seq.empty) :: Nil, - true, + conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED), plan ) } diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilderBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilderBase.scala index 33ed18a19..74e7c452e 100644 --- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilderBase.scala +++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilderBase.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.kyuubi.sql.KyuubiSQLConf import org.apache.kyuubi.sql.zorder.ZorderSqlExtensionsParser.{BigDecimalLiteralContext, BigIntLiteralContext, BooleanLiteralContext, DecimalLiteralContext, DoubleLiteralContext, IntegerLiteralContext, LogicalBinaryContext, MultipartIdentifierContext, NullLiteralContext, NumberContext, OptimizeZorderContext, PassThroughContext, QueryContext, SingleStatementContext, SmallIntLiteralContext, StringLiteralContext, TinyIntLiteralContext, TypeConstructorContext, ZorderClauseContext} abstract class ZorderSqlAstBuilderBase extends ZorderSqlExtensionsBaseVisitor[AnyRef] { @@ -89,7 +90,7 @@ abstract class ZorderSqlAstBuilderBase extends ZorderSqlExtensionsBaseVisitor[An val query = Sort( SortOrder(orderExpr, Ascending, NullsLast, Seq.empty) :: Nil, - true, + conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED), Project(Seq(UnresolvedStar(None)), tableWithFilter)) buildOptimizeZorderStatement(tableIdent, query) diff --git a/dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuite.scala b/dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuite.scala index 47e7839b3..f3e60852a 100644 --- a/dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuite.scala +++ b/dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuite.scala @@ -537,6 +537,28 @@ trait ZorderSuite extends KyuubiSparkSQLExtensionTest with ExpressionEvalHelper } } } + + test("Add config to control if zorder using global sort") { + withTable("t") { + withSQLConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED.key -> "false") { + sql( + """ + |CREATE TABLE t (c1 int, c2 string) TBLPROPERTIES ( + |'kyuubi.zorder.enabled'= 'true', + |'kyuubi.zorder.cols'= 'c1,c2') + |""".stripMargin) + val p1 = sql("OPTIMIZE t ZORDER BY c1, c2").queryExecution.analyzed + assert(p1.collect { + case shuffle: Sort if !shuffle.global => shuffle + }.size == 1) + + val p2 = sql("INSERT INTO TABLE t SELECT * FROM VALUES(1,'a')").queryExecution.analyzed + assert(p2.collect { + case shuffle: Sort if !shuffle.global => shuffle + }.size == 1) + } + } + } } class ZorderWithCodegenEnabledSuite extends ZorderSuite { diff --git a/docs/sql/rules.md b/docs/sql/rules.md index 0723ed4f2..ee7b33067 100644 --- a/docs/sql/rules.md +++ b/docs/sql/rules.md @@ -75,4 +75,5 @@ spark.sql.optimizer.forceShuffleBeforeJoin.enabled | false | Ensure shuffle node spark.sql.optimizer.finalStageConfigIsolation.enabled | false | If true, the final stage support use different config with previous stage. The prefix of final stage config key should be `spark.sql.finalStage.`. For example, the raw spark config: `spark.sql.adaptive.advisoryPartitionSizeInBytes`, then the final stage config should be: `spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`. | 1.2.0 spark.sql.analyzer.classification.enabled | false | When true, allows Kyuubi engine to judge this SQL's classification and set `spark.sql.analyzer.classification` back into sessionConf. Through this configuration item, Spark can optimizing configuration dynamic. | 1.4.0 spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we will follow target table properties to insert zorder or not. The key properties are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by these cols. | 1.4.0 +spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder. | 1.4.0 spark.sql.watchdog.maxHivePartitions | none | Add maxHivePartitions Strategy to avoid scan excessive hive partitions on partitioned table, it's optional that works with defined. | 1.4.0