diff --git a/docs/extensions/engines/spark/rules.md b/docs/extensions/engines/spark/rules.md index a3c2678cf..1ad338051 100644 --- a/docs/extensions/engines/spark/rules.md +++ b/docs/extensions/engines/spark/rules.md @@ -18,7 +18,7 @@ # Auxiliary Optimization Rules -Kyuubi provides SQL extension out of box. Due to the version compatibility with Apache Spark, currently we only support Apache Spark branch-3.1 (i.e 3.1.1 and 3.1.2). +Kyuubi provides SQL extension out of box. Due to the version compatibility with Apache Spark, currently we support Apache Spark branch-3.1 and later. And don't worry, Kyuubi will support the new Apache Spark version in the future. Thanks to the adaptive query execution framework (AQE), Kyuubi can do these optimizations. ## Features @@ -68,17 +68,18 @@ Now, you can enjoy the Kyuubi SQL Extension. Kyuubi provides some configs to make these feature easy to use. -| Name | Default Value | Description | Since | -|-------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------| -| spark.sql.optimizer.insertRepartitionBeforeWrite.enabled | true | Add repartition node at the top of query plan. An approach of merging small files. | 1.2.0 | -| spark.sql.optimizer.insertRepartitionNum | none | The partition number if `spark.sql.optimizer.insertRepartitionBeforeWrite.enabled` is enabled. If AQE is disabled, the default value is `spark.sql.shuffle.partitions`. If AQE is enabled, the default value is none that means depend on AQE. | 1.2.0 | -| spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum | 100 | The partition number of each dynamic partition if `spark.sql.optimizer.insertRepartitionBeforeWrite.enabled` is enabled. We will repartition by dynamic partition columns to reduce the small file but that can cause data skew. This config is to extend the partition of dynamic partition column to avoid skew but may generate some small files. | 1.2.0 | -| spark.sql.optimizer.forceShuffleBeforeJoin.enabled | false | Ensure shuffle node exists before shuffled join (shj and smj) to make AQE `OptimizeSkewedJoin` works (complex scenario join, multi table join). | 1.2.0 | -| spark.sql.optimizer.finalStageConfigIsolation.enabled | false | If true, the final stage support use different config with previous stage. The prefix of final stage config key should be `spark.sql.finalStage.`. For example, the raw spark config: `spark.sql.adaptive.advisoryPartitionSizeInBytes`, then the final stage config should be: `spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`. | 1.2.0 | -| spark.sql.analyzer.classification.enabled | false | When true, allows Kyuubi engine to judge this SQL's classification and set `spark.sql.analyzer.classification` back into sessionConf. Through this configuration item, Spark can optimizing configuration dynamic. | 1.4.0 | -| spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we will follow target table properties to insert zorder or not. The key properties are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by these cols. | 1.4.0 | -| spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder. | 1.4.0 | -| spark.sql.watchdog.maxPartitions | none | Set the max partition number when spark scans a data source. Enable MaxPartitionStrategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined | 1.4.0 | -| spark.sql.optimizer.dropIgnoreNonExistent | false | When true, do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a non-existent database/table/view/function/partition | 1.5.0 | -| spark.sql.optimizer.inferRebalanceAndSortOrders.enabled | false | When ture, infer columns for rebalance and sort orders from original query, e.g. the join keys from join. It can avoid compression ratio regression. | 1.7.0 | -| spark.sql.optimizer.inferRebalanceAndSortOrdersMaxColumns | 3 | The max columns of inferred columns. | 1.7.0 | +| Name | Default Value | Description | Since | +|---------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------| +| spark.sql.optimizer.insertRepartitionBeforeWrite.enabled | true | Add repartition node at the top of query plan. An approach of merging small files. | 1.2.0 | +| spark.sql.optimizer.insertRepartitionNum | none | The partition number if `spark.sql.optimizer.insertRepartitionBeforeWrite.enabled` is enabled. If AQE is disabled, the default value is `spark.sql.shuffle.partitions`. If AQE is enabled, the default value is none that means depend on AQE. | 1.2.0 | +| spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum | 100 | The partition number of each dynamic partition if `spark.sql.optimizer.insertRepartitionBeforeWrite.enabled` is enabled. We will repartition by dynamic partition columns to reduce the small file but that can cause data skew. This config is to extend the partition of dynamic partition column to avoid skew but may generate some small files. | 1.2.0 | +| spark.sql.optimizer.forceShuffleBeforeJoin.enabled | false | Ensure shuffle node exists before shuffled join (shj and smj) to make AQE `OptimizeSkewedJoin` works (complex scenario join, multi table join). | 1.2.0 | +| spark.sql.optimizer.finalStageConfigIsolation.enabled | false | If true, the final stage support use different config with previous stage. The prefix of final stage config key should be `spark.sql.finalStage.`. For example, the raw spark config: `spark.sql.adaptive.advisoryPartitionSizeInBytes`, then the final stage config should be: `spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`. | 1.2.0 | +| spark.sql.analyzer.classification.enabled | false | When true, allows Kyuubi engine to judge this SQL's classification and set `spark.sql.analyzer.classification` back into sessionConf. Through this configuration item, Spark can optimizing configuration dynamic. | 1.4.0 | +| spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we will follow target table properties to insert zorder or not. The key properties are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by these cols. | 1.4.0 | +| spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder. | 1.4.0 | +| spark.sql.watchdog.maxPartitions | none | Set the max partition number when spark scans a data source. Enable MaxPartitionStrategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined | 1.4.0 | +| spark.sql.optimizer.dropIgnoreNonExistent | false | When true, do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a non-existent database/table/view/function/partition | 1.5.0 | +| spark.sql.optimizer.inferRebalanceAndSortOrders.enabled | false | When ture, infer columns for rebalance and sort orders from original query, e.g. the join keys from join. It can avoid compression ratio regression. | 1.7.0 | +| spark.sql.optimizer.inferRebalanceAndSortOrdersMaxColumns | 3 | The max columns of inferred columns. | 1.7.0 | +| spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled | false | When true, add repartition even if the original plan does not have shuffle. | 1.7.0 | diff --git a/extensions/spark/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala b/extensions/spark/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala index f978623af..fcafd6611 100644 --- a/extensions/spark/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala +++ b/extensions/spark/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala @@ -26,14 +26,16 @@ import org.apache.kyuubi.sql.KyuubiSQLConf class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { test("check repartition exists") { - def check(df: DataFrame, expectedRepartitionNum: Int = 1): Unit = { - assert( - df.queryExecution.analyzed.collect { - case r: RepartitionByExpression => - assert(r.optNumPartitions === - spark.sessionState.conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)) - r - }.size == expectedRepartitionNum) + def check(df: => DataFrame, expectedRepartitionNum: Int = 1): Unit = { + withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { + assert( + df.queryExecution.analyzed.collect { + case r: RepartitionByExpression => + assert(r.optNumPartitions === + spark.sessionState.conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)) + r + }.size == expectedRepartitionNum) + } } // It's better to set config explicitly in case of we change the default value. @@ -108,7 +110,9 @@ class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { }.isEmpty) } - withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") { + withSQLConf( + KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true", + KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { // test no write command check(sql("SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")) check(sql("SELECT count(*) FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")) @@ -163,7 +167,8 @@ class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { withSQLConf( KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true", - KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM.key -> "2") { + KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM.key -> "2", + KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { Seq("USING PARQUET", "").foreach { storage => withTable("tmp1") { sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)") @@ -181,7 +186,8 @@ class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { test("OptimizedCreateHiveTableAsSelectCommand") { withSQLConf( HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true", - HiveUtils.CONVERT_METASTORE_CTAS.key -> "true") { + HiveUtils.CONVERT_METASTORE_CTAS.key -> "true", + KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { withTable("t") { val df = sql(s"CREATE TABLE t STORED AS parquet AS SELECT 1 as a") val ctas = df.queryExecution.analyzed.collect { diff --git a/extensions/spark/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala b/extensions/spark/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala index f1a27cdb8..9fe188b56 100644 --- a/extensions/spark/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala +++ b/extensions/spark/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala @@ -26,11 +26,19 @@ import org.apache.kyuubi.sql.KyuubiSQLConf class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { test("check rebalance exists") { - def check(df: DataFrame, expectedRebalanceNum: Int = 1): Unit = { - assert( - df.queryExecution.analyzed.collect { - case r: RebalancePartitions => r - }.size == expectedRebalanceNum) + def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = { + withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { + assert( + df.queryExecution.analyzed.collect { + case r: RebalancePartitions => r + }.size == expectedRebalanceNum) + } + withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "false") { + assert( + df.queryExecution.analyzed.collect { + case r: RebalancePartitions => r + }.isEmpty) + } } // It's better to set config explicitly in case of we change the default value. @@ -91,7 +99,9 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { }.isEmpty) } - withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") { + withSQLConf( + KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true", + KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { // test no write command check(sql("SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")) check(sql("SELECT count(*) FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")) @@ -144,7 +154,8 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { withSQLConf( KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true", - KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM.key -> "2") { + KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM.key -> "2", + KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { Seq("USING PARQUET", "").foreach { storage => withTable("tmp1") { sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)") @@ -162,7 +173,8 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { test("OptimizedCreateHiveTableAsSelectCommand") { withSQLConf( HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true", - HiveUtils.CONVERT_METASTORE_CTAS.key -> "true") { + HiveUtils.CONVERT_METASTORE_CTAS.key -> "true", + KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { withTable("t") { val df = sql(s"CREATE TABLE t STORED AS parquet AS SELECT 1 as a") val ctas = df.queryExecution.analyzed.collect { diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala index 859b6256c..992891697 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala @@ -28,11 +28,19 @@ import org.apache.kyuubi.sql.KyuubiSQLConf class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { test("check rebalance exists") { - def check(df: DataFrame, expectedRebalanceNum: Int = 1): Unit = { - assert( - df.queryExecution.analyzed.collect { - case r: RebalancePartitions => r - }.size == expectedRebalanceNum) + def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = { + withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { + assert( + df.queryExecution.analyzed.collect { + case r: RebalancePartitions => r + }.size == expectedRebalanceNum) + } + withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "false") { + assert( + df.queryExecution.analyzed.collect { + case r: RebalancePartitions => r + }.isEmpty) + } } // It's better to set config explicitly in case of we change the default value. @@ -93,7 +101,9 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { }.isEmpty) } - withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") { + withSQLConf( + KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true", + KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { // test no write command check(sql("SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")) check(sql("SELECT count(*) FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")) @@ -146,7 +156,8 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { withSQLConf( KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true", - KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM.key -> "2") { + KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM.key -> "2", + KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { Seq("USING PARQUET", "").foreach { storage => withTable("tmp1") { sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)") @@ -164,7 +175,8 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { test("OptimizedCreateHiveTableAsSelectCommand") { withSQLConf( HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true", - HiveUtils.CONVERT_METASTORE_CTAS.key -> "true") { + HiveUtils.CONVERT_METASTORE_CTAS.key -> "true", + KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { withTable("t") { val df = sql(s"CREATE TABLE t STORED AS parquet AS SELECT 1 as a") val ctas = df.queryExecution.analyzed.collect { diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala index f30ab2bc2..b5b7f31b7 100644 --- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala +++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -176,4 +176,11 @@ object KyuubiSQLConf { .intConf .checkValue(_ > 0, "must be positive number") .createWithDefault(3) + + val INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE = + buildConf("spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled") + .doc("When true, add repartition even if the original plan does not have shuffle.") + .version("1.7.0") + .booleanConf + .createWithDefault(false) } diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala index 33aff09a4..95f3529e2 100644 --- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala +++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala @@ -108,14 +108,37 @@ abstract class RepartitionBeforeWritingHiveBase extends RepartitionBuilder { } } -trait RepartitionBeforeWriteHelper { - def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = plan match { - case Project(_, child) => canInsertRepartitionByExpression(child) - case SubqueryAlias(_, child) => canInsertRepartitionByExpression(child) - case Limit(_, _) => false - case _: Sort => false - case _: RepartitionByExpression => false - case _: Repartition => false - case _ => true +trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] { + private def hasBenefit(plan: LogicalPlan): Boolean = { + def probablyHasShuffle: Boolean = plan.find { + case _: Join => true + case _: Aggregate => true + case _: Distinct => true + case _: Deduplicate => true + case _: Window => true + case s: Sort if s.global => true + case _: RepartitionOperation => true + case _: GlobalLimit => true + case _ => false + }.isDefined + + conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE) || probablyHasShuffle + } + + def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = { + def canInsert(p: LogicalPlan): Boolean = p match { + case Project(_, child) => canInsert(child) + case SubqueryAlias(_, child) => canInsert(child) + case Limit(_, _) => false + case _: Sort => false + case _: RepartitionByExpression => false + case _: Repartition => false + case _ => true + } + + // 1. make sure AQE is enabled, otherwise it is no meaning to add a shuffle + // 2. make sure it does not break the semantics of original plan + // 3. try to avoid adding a shuffle if it has potential performance regression + conf.adaptiveExecutionEnabled && canInsert(plan) && hasBenefit(plan) } }