diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala index 3343946e8..6bd014d6a 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.sql import org.apache.spark.sql.SparkSessionExtensions -import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder} +import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33, InsertZorderBeforeWritingHive33, ResolveZorder} class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) { override def apply(extensions: SparkSessionExtensions): Unit = { @@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension { // should be applied before // RepartitionBeforeWriting and RebalanceBeforeWriting // because we can only apply one of them (i.e. Global Sort or Repartition/Rebalance) - extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource) - extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive) + extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33) + extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33) extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule) extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin) diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting33.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting33.scala new file mode 100644 index 000000000..083c80afd --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting33.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.zorder + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, NullsLast, SortOrder} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand +import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand +import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand} + +import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException} + +trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder { + private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled" + private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols" + + def isZorderEnabled(props: Map[String, String]): Boolean = { + props.contains(KYUUBI_ZORDER_ENABLED) && + "true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) && + props.contains(KYUUBI_ZORDER_COLS) + } + + def getZorderColumns(props: Map[String, String]): Seq[String] = { + val cols = props.get(KYUUBI_ZORDER_COLS) + assert(cols.isDefined) + cols.get.split(",").map(_.trim) + } + + def canInsertZorder(query: LogicalPlan): Boolean = query match { + case Project(_, child) => canInsertZorder(child) + // TODO: actually, we can force zorder even if existed some shuffle + case _: Sort => false + case _: RepartitionByExpression => false + case _: Repartition => false + case _ => true + } + + def insertZorder( + catalogTable: CatalogTable, + plan: LogicalPlan, + dynamicPartitionColumns: Seq[Attribute]): LogicalPlan = { + if (!canInsertZorder(plan)) { + return plan + } + val cols = getZorderColumns(catalogTable.properties) + val resolver = session.sessionState.conf.resolver + val output = plan.output + val bound = cols.flatMap(col => output.find(attr => resolver(attr.name, col))) + if (bound.size < cols.size) { + logWarning(s"target table does not contain all zorder cols: ${cols.mkString(",")}, " + + s"please check your table properties ${KYUUBI_ZORDER_COLS}.") + plan + } else { + if (conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) && + conf.getConf(KyuubiSQLConf.REBALANCE_BEFORE_ZORDER)) { + throw new KyuubiSQLExtensionException(s"${KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED.key} " + + s"and ${KyuubiSQLConf.REBALANCE_BEFORE_ZORDER.key} can not be enabled together.") + } + if (conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) && + dynamicPartitionColumns.nonEmpty) { + logWarning(s"Dynamic partition insertion with global sort may produce small files.") + } + + val zorderExpr = + if (bound.length == 1) { + bound + } else if (conf.getConf(KyuubiSQLConf.ZORDER_USING_ORIGINAL_ORDERING_ENABLED)) { + bound.asInstanceOf[Seq[Expression]] + } else { + buildZorder(bound) :: Nil + } + val (global, orderExprs, child) = + if (conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED)) { + (true, zorderExpr, plan) + } else if (conf.getConf(KyuubiSQLConf.REBALANCE_BEFORE_ZORDER)) { + val rebalanceExpr = + if (dynamicPartitionColumns.isEmpty) { + // static partition insert + bound + } else if (conf.getConf(KyuubiSQLConf.REBALANCE_ZORDER_COLUMNS_ENABLED)) { + // improve data compression ratio + dynamicPartitionColumns.asInstanceOf[Seq[Expression]] ++ bound + } else { + dynamicPartitionColumns.asInstanceOf[Seq[Expression]] + } + // for dynamic partition insert, Spark always sort the partition columns, + // so here we sort partition columns + zorder. + val rebalance = + if (dynamicPartitionColumns.nonEmpty && + conf.getConf(KyuubiSQLConf.TWO_PHASE_REBALANCE_BEFORE_ZORDER)) { + // improve compression ratio + RebalancePartitions( + rebalanceExpr, + RebalancePartitions(dynamicPartitionColumns, plan)) + } else { + RebalancePartitions(rebalanceExpr, plan) + } + (false, dynamicPartitionColumns.asInstanceOf[Seq[Expression]] ++ zorderExpr, rebalance) + } else { + (false, zorderExpr, plan) + } + val order = orderExprs.map { expr => + SortOrder(expr, Ascending, NullsLast, Seq.empty) + } + Sort(order, global, child) + } + } + + override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children) + + def session: SparkSession + def applyInternal(plan: LogicalPlan): LogicalPlan + + final override def apply(plan: LogicalPlan): LogicalPlan = { + if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) { + applyInternal(plan) + } else { + plan + } + } +} + +case class InsertZorderBeforeWritingDatasource33(session: SparkSession) + extends InsertZorderHelper33 { + override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match { + case insert: InsertIntoHadoopFsRelationCommand + if insert.query.resolved && + insert.bucketSpec.isEmpty && insert.catalogTable.isDefined && + isZorderEnabled(insert.catalogTable.get.properties) => + val dynamicPartition = + insert.partitionColumns.filterNot(attr => insert.staticPartitions.contains(attr.name)) + val newQuery = insertZorder(insert.catalogTable.get, insert.query, dynamicPartition) + if (newQuery.eq(insert.query)) { + insert + } else { + insert.copy(query = newQuery) + } + + case ctas: CreateDataSourceTableAsSelectCommand + if ctas.query.resolved && + ctas.table.bucketSpec.isEmpty && isZorderEnabled(ctas.table.properties) => + val dynamicPartition = + ctas.query.output.filter(attr => ctas.table.partitionColumnNames.contains(attr.name)) + val newQuery = insertZorder(ctas.table, ctas.query, dynamicPartition) + if (newQuery.eq(ctas.query)) { + ctas + } else { + ctas.copy(query = newQuery) + } + + case _ => plan + } +} + +case class InsertZorderBeforeWritingHive33(session: SparkSession) + extends InsertZorderHelper33 { + override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match { + case insert: InsertIntoHiveTable + if insert.query.resolved && + insert.table.bucketSpec.isEmpty && isZorderEnabled(insert.table.properties) => + val dynamicPartition = insert.partition.filter(_._2.isEmpty).keys + .flatMap(name => insert.query.output.find(_.name == name)).toSeq + val newQuery = insertZorder(insert.table, insert.query, dynamicPartition) + if (newQuery.eq(insert.query)) { + insert + } else { + insert.copy(query = newQuery) + } + + case ctas: CreateHiveTableAsSelectCommand + if ctas.query.resolved && + ctas.tableDesc.bucketSpec.isEmpty && isZorderEnabled(ctas.tableDesc.properties) => + val dynamicPartition = + ctas.query.output.filter(attr => ctas.tableDesc.partitionColumnNames.contains(attr.name)) + val newQuery = insertZorder(ctas.tableDesc, ctas.query, dynamicPartition) + if (newQuery.eq(ctas.query)) { + ctas + } else { + ctas.copy(query = newQuery) + } + + case octas: OptimizedCreateHiveTableAsSelectCommand + if octas.query.resolved && + octas.tableDesc.bucketSpec.isEmpty && isZorderEnabled(octas.tableDesc.properties) => + val dynamicPartition = + octas.query.output.filter(attr => octas.tableDesc.partitionColumnNames.contains(attr.name)) + val newQuery = insertZorder(octas.tableDesc, octas.query, dynamicPartition) + if (newQuery.eq(octas.query)) { + octas + } else { + octas.copy(query = newQuery) + } + + case _ => plan + } +} diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala index fd04e27db..90fc17e24 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala @@ -17,6 +17,95 @@ package org.apache.spark.sql -class ZorderWithCodegenEnabledSuite extends ZorderWithCodegenEnabledSuiteBase {} +import org.apache.spark.sql.catalyst.plans.logical.{RebalancePartitions, Sort} +import org.apache.spark.sql.internal.SQLConf -class ZorderWithCodegenDisabledSuite extends ZorderWithCodegenDisabledSuiteBase {} +import org.apache.kyuubi.sql.KyuubiSQLConf +import org.apache.kyuubi.sql.zorder.Zorder + +trait ZorderWithCodegenEnabledSuiteBase33 extends ZorderWithCodegenEnabledSuiteBase { + + test("Add rebalance before zorder") { + Seq("true" -> false, "false" -> true).foreach { case (useOriginalOrdering, zorder) => + withSQLConf( + KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED.key -> "false", + KyuubiSQLConf.REBALANCE_BEFORE_ZORDER.key -> "true", + KyuubiSQLConf.REBALANCE_ZORDER_COLUMNS_ENABLED.key -> "true", + KyuubiSQLConf.ZORDER_USING_ORIGINAL_ORDERING_ENABLED.key -> useOriginalOrdering) { + withTable("t") { + sql( + """ + |CREATE TABLE t (c1 int, c2 string) PARTITIONED BY (d string) + | TBLPROPERTIES ( + |'kyuubi.zorder.enabled'= 'true', + |'kyuubi.zorder.cols'= 'c1,C2') + |""".stripMargin) + val p = sql("INSERT INTO TABLE t PARTITION(d='a') SELECT * FROM VALUES(1,'a')") + .queryExecution.analyzed + assert(p.collect { + case sort: Sort + if !sort.global && + ((sort.order.exists(_.child.isInstanceOf[Zorder]) && zorder) || + (!sort.order.exists(_.child.isInstanceOf[Zorder]) && !zorder)) => sort + }.size == 1) + assert(p.collect { + case rebalance: RebalancePartitions + if rebalance.references.map(_.name).exists(_.equals("c1")) => rebalance + }.size == 1) + + val p2 = sql("INSERT INTO TABLE t PARTITION(d) SELECT * FROM VALUES(1,'a','b')") + .queryExecution.analyzed + assert(p2.collect { + case sort: Sort + if (!sort.global && Seq("c1", "c2", "d").forall(x => + sort.references.map(_.name).exists(_.equals(x)))) && + ((sort.order.exists(_.child.isInstanceOf[Zorder]) && zorder) || + (!sort.order.exists(_.child.isInstanceOf[Zorder]) && !zorder)) => sort + }.size == 1) + assert(p2.collect { + case rebalance: RebalancePartitions + if Seq("c1", "c2", "d").forall(x => + rebalance.references.map(_.name).exists(_.equals(x))) => rebalance + }.size == 1) + } + } + } + } + + test("Two phase rebalance before Z-Order") { + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.CollapseRepartition", + KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED.key -> "false", + KyuubiSQLConf.REBALANCE_BEFORE_ZORDER.key -> "true", + KyuubiSQLConf.TWO_PHASE_REBALANCE_BEFORE_ZORDER.key -> "true", + KyuubiSQLConf.REBALANCE_ZORDER_COLUMNS_ENABLED.key -> "true") { + withTable("t") { + sql( + """ + |CREATE TABLE t (c1 int) PARTITIONED BY (d string) + | TBLPROPERTIES ( + |'kyuubi.zorder.enabled'= 'true', + |'kyuubi.zorder.cols'= 'c1') + |""".stripMargin) + val p = sql("INSERT INTO TABLE t PARTITION(d) SELECT * FROM VALUES(1,'a')") + val rebalance = p.queryExecution.optimizedPlan.innerChildren + .flatMap(_.collect { case r: RebalancePartitions => r }) + assert(rebalance.size == 2) + assert(rebalance.head.partitionExpressions.flatMap(_.references.map(_.name)) + .contains("d")) + assert(rebalance.head.partitionExpressions.flatMap(_.references.map(_.name)) + .contains("c1")) + + assert(rebalance(1).partitionExpressions.flatMap(_.references.map(_.name)) + .contains("d")) + assert(!rebalance(1).partitionExpressions.flatMap(_.references.map(_.name)) + .contains("c1")) + } + } + } +} + +class ZorderWithCodegenEnabledSuite extends ZorderWithCodegenEnabledSuiteBase33 {} + +class ZorderWithCodegenDisabledSuite extends ZorderWithCodegenEnabledSuiteBase33 {} diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala index e3b6c9106..d5ddb788c 100644 --- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala +++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -97,6 +97,44 @@ object KyuubiSQLConf { .booleanConf .createWithDefault(true) + val REBALANCE_BEFORE_ZORDER = + buildConf("spark.sql.optimizer.rebalanceBeforeZorder.enabled") + .doc("when true, we do a rebalance before zorder in case data skew. " + + "Note that, if the insertion is dynamic partition we will use the partition " + + "columns to rebalance. Note that, this config only affects with Spark 3.3.x") + .version("1.6.0") + .booleanConf + .createWithDefault(false) + + val REBALANCE_ZORDER_COLUMNS_ENABLED = + buildConf("spark.sql.optimizer.rebalanceZorderColumns.enabled") + .doc(s"When true and ${REBALANCE_BEFORE_ZORDER.key} is true, we do rebalance before " + + s"Z-Order. If it's dynamic partition insert, the rebalance expression will include " + + s"both partition columns and Z-Order columns. Note that, this config only " + + s"affects with Spark 3.3.x") + .version("1.6.0") + .booleanConf + .createWithDefault(false) + + val TWO_PHASE_REBALANCE_BEFORE_ZORDER = + buildConf("spark.sql.optimizer.twoPhaseRebalanceBeforeZorder.enabled") + .doc(s"When true and ${REBALANCE_BEFORE_ZORDER.key} is true, we do two phase rebalance " + + s"before Z-Order for the dynamic partition write. The first phase rebalance using " + + s"dynamic partition column; The second phase rebalance using dynamic partition column + " + + s"Z-Order columns. Note that, this config only affects with Spark 3.3.x") + .version("1.6.0") + .booleanConf + .createWithDefault(false) + + val ZORDER_USING_ORIGINAL_ORDERING_ENABLED = + buildConf("spark.sql.optimizer.zorderUsingOriginalOrdering.enabled") + .doc(s"When true and ${REBALANCE_BEFORE_ZORDER.key} is true, we do sort by " + + s"the original ordering i.e. lexicographical order. Note that, this config only " + + s"affects with Spark 3.3.x") + .version("1.6.0") + .booleanConf + .createWithDefault(false) + val WATCHDOG_MAX_PARTITIONS = buildConf("spark.sql.watchdog.maxPartitions") .doc("Set the max partition number when spark scans a data source. " +