[KYUUBI #2830] Imporve Z-Order with Spark3.3

### _Why are the changes needed?_

We can inject rebalance before Z-Order to avoid data skew.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2830 from ulysses-you/improve-zorder.

Closes #2830

789aba45 [ulysses-you] cleanup
e169a202 [ulysses-you] resolver
9134496c [ulysses-you] style
048fe294 [ulysses-you] docs
e06f1ef8 [ulysses-you] imporve zorder

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
ulysses-you 2022-06-09 11:16:24 +08:00 committed by ulysses-you
parent a06a2ca441
commit 9d706e55ed
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
4 changed files with 346 additions and 5 deletions

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder}
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33, InsertZorderBeforeWritingHive33, ResolveZorder}
class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension {
// should be applied before
// RepartitionBeforeWriting and RebalanceBeforeWriting
// because we can only apply one of them (i.e. Global Sort or Repartition/Rebalance)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33)
extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)

View File

@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.sql.zorder
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, NullsLast, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder {
private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
def isZorderEnabled(props: Map[String, String]): Boolean = {
props.contains(KYUUBI_ZORDER_ENABLED) &&
"true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) &&
props.contains(KYUUBI_ZORDER_COLS)
}
def getZorderColumns(props: Map[String, String]): Seq[String] = {
val cols = props.get(KYUUBI_ZORDER_COLS)
assert(cols.isDefined)
cols.get.split(",").map(_.trim)
}
def canInsertZorder(query: LogicalPlan): Boolean = query match {
case Project(_, child) => canInsertZorder(child)
// TODO: actually, we can force zorder even if existed some shuffle
case _: Sort => false
case _: RepartitionByExpression => false
case _: Repartition => false
case _ => true
}
def insertZorder(
catalogTable: CatalogTable,
plan: LogicalPlan,
dynamicPartitionColumns: Seq[Attribute]): LogicalPlan = {
if (!canInsertZorder(plan)) {
return plan
}
val cols = getZorderColumns(catalogTable.properties)
val resolver = session.sessionState.conf.resolver
val output = plan.output
val bound = cols.flatMap(col => output.find(attr => resolver(attr.name, col)))
if (bound.size < cols.size) {
logWarning(s"target table does not contain all zorder cols: ${cols.mkString(",")}, " +
s"please check your table properties ${KYUUBI_ZORDER_COLS}.")
plan
} else {
if (conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) &&
conf.getConf(KyuubiSQLConf.REBALANCE_BEFORE_ZORDER)) {
throw new KyuubiSQLExtensionException(s"${KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED.key} " +
s"and ${KyuubiSQLConf.REBALANCE_BEFORE_ZORDER.key} can not be enabled together.")
}
if (conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) &&
dynamicPartitionColumns.nonEmpty) {
logWarning(s"Dynamic partition insertion with global sort may produce small files.")
}
val zorderExpr =
if (bound.length == 1) {
bound
} else if (conf.getConf(KyuubiSQLConf.ZORDER_USING_ORIGINAL_ORDERING_ENABLED)) {
bound.asInstanceOf[Seq[Expression]]
} else {
buildZorder(bound) :: Nil
}
val (global, orderExprs, child) =
if (conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED)) {
(true, zorderExpr, plan)
} else if (conf.getConf(KyuubiSQLConf.REBALANCE_BEFORE_ZORDER)) {
val rebalanceExpr =
if (dynamicPartitionColumns.isEmpty) {
// static partition insert
bound
} else if (conf.getConf(KyuubiSQLConf.REBALANCE_ZORDER_COLUMNS_ENABLED)) {
// improve data compression ratio
dynamicPartitionColumns.asInstanceOf[Seq[Expression]] ++ bound
} else {
dynamicPartitionColumns.asInstanceOf[Seq[Expression]]
}
// for dynamic partition insert, Spark always sort the partition columns,
// so here we sort partition columns + zorder.
val rebalance =
if (dynamicPartitionColumns.nonEmpty &&
conf.getConf(KyuubiSQLConf.TWO_PHASE_REBALANCE_BEFORE_ZORDER)) {
// improve compression ratio
RebalancePartitions(
rebalanceExpr,
RebalancePartitions(dynamicPartitionColumns, plan))
} else {
RebalancePartitions(rebalanceExpr, plan)
}
(false, dynamicPartitionColumns.asInstanceOf[Seq[Expression]] ++ zorderExpr, rebalance)
} else {
(false, zorderExpr, plan)
}
val order = orderExprs.map { expr =>
SortOrder(expr, Ascending, NullsLast, Seq.empty)
}
Sort(order, global, child)
}
}
override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
def session: SparkSession
def applyInternal(plan: LogicalPlan): LogicalPlan
final override def apply(plan: LogicalPlan): LogicalPlan = {
if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) {
applyInternal(plan)
} else {
plan
}
}
}
case class InsertZorderBeforeWritingDatasource33(session: SparkSession)
extends InsertZorderHelper33 {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHadoopFsRelationCommand
if insert.query.resolved &&
insert.bucketSpec.isEmpty && insert.catalogTable.isDefined &&
isZorderEnabled(insert.catalogTable.get.properties) =>
val dynamicPartition =
insert.partitionColumns.filterNot(attr => insert.staticPartitions.contains(attr.name))
val newQuery = insertZorder(insert.catalogTable.get, insert.query, dynamicPartition)
if (newQuery.eq(insert.query)) {
insert
} else {
insert.copy(query = newQuery)
}
case ctas: CreateDataSourceTableAsSelectCommand
if ctas.query.resolved &&
ctas.table.bucketSpec.isEmpty && isZorderEnabled(ctas.table.properties) =>
val dynamicPartition =
ctas.query.output.filter(attr => ctas.table.partitionColumnNames.contains(attr.name))
val newQuery = insertZorder(ctas.table, ctas.query, dynamicPartition)
if (newQuery.eq(ctas.query)) {
ctas
} else {
ctas.copy(query = newQuery)
}
case _ => plan
}
}
case class InsertZorderBeforeWritingHive33(session: SparkSession)
extends InsertZorderHelper33 {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHiveTable
if insert.query.resolved &&
insert.table.bucketSpec.isEmpty && isZorderEnabled(insert.table.properties) =>
val dynamicPartition = insert.partition.filter(_._2.isEmpty).keys
.flatMap(name => insert.query.output.find(_.name == name)).toSeq
val newQuery = insertZorder(insert.table, insert.query, dynamicPartition)
if (newQuery.eq(insert.query)) {
insert
} else {
insert.copy(query = newQuery)
}
case ctas: CreateHiveTableAsSelectCommand
if ctas.query.resolved &&
ctas.tableDesc.bucketSpec.isEmpty && isZorderEnabled(ctas.tableDesc.properties) =>
val dynamicPartition =
ctas.query.output.filter(attr => ctas.tableDesc.partitionColumnNames.contains(attr.name))
val newQuery = insertZorder(ctas.tableDesc, ctas.query, dynamicPartition)
if (newQuery.eq(ctas.query)) {
ctas
} else {
ctas.copy(query = newQuery)
}
case octas: OptimizedCreateHiveTableAsSelectCommand
if octas.query.resolved &&
octas.tableDesc.bucketSpec.isEmpty && isZorderEnabled(octas.tableDesc.properties) =>
val dynamicPartition =
octas.query.output.filter(attr => octas.tableDesc.partitionColumnNames.contains(attr.name))
val newQuery = insertZorder(octas.tableDesc, octas.query, dynamicPartition)
if (newQuery.eq(octas.query)) {
octas
} else {
octas.copy(query = newQuery)
}
case _ => plan
}
}

View File

@ -17,6 +17,95 @@
package org.apache.spark.sql
class ZorderWithCodegenEnabledSuite extends ZorderWithCodegenEnabledSuiteBase {}
import org.apache.spark.sql.catalyst.plans.logical.{RebalancePartitions, Sort}
import org.apache.spark.sql.internal.SQLConf
class ZorderWithCodegenDisabledSuite extends ZorderWithCodegenDisabledSuiteBase {}
import org.apache.kyuubi.sql.KyuubiSQLConf
import org.apache.kyuubi.sql.zorder.Zorder
trait ZorderWithCodegenEnabledSuiteBase33 extends ZorderWithCodegenEnabledSuiteBase {
test("Add rebalance before zorder") {
Seq("true" -> false, "false" -> true).foreach { case (useOriginalOrdering, zorder) =>
withSQLConf(
KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED.key -> "false",
KyuubiSQLConf.REBALANCE_BEFORE_ZORDER.key -> "true",
KyuubiSQLConf.REBALANCE_ZORDER_COLUMNS_ENABLED.key -> "true",
KyuubiSQLConf.ZORDER_USING_ORIGINAL_ORDERING_ENABLED.key -> useOriginalOrdering) {
withTable("t") {
sql(
"""
|CREATE TABLE t (c1 int, c2 string) PARTITIONED BY (d string)
| TBLPROPERTIES (
|'kyuubi.zorder.enabled'= 'true',
|'kyuubi.zorder.cols'= 'c1,C2')
|""".stripMargin)
val p = sql("INSERT INTO TABLE t PARTITION(d='a') SELECT * FROM VALUES(1,'a')")
.queryExecution.analyzed
assert(p.collect {
case sort: Sort
if !sort.global &&
((sort.order.exists(_.child.isInstanceOf[Zorder]) && zorder) ||
(!sort.order.exists(_.child.isInstanceOf[Zorder]) && !zorder)) => sort
}.size == 1)
assert(p.collect {
case rebalance: RebalancePartitions
if rebalance.references.map(_.name).exists(_.equals("c1")) => rebalance
}.size == 1)
val p2 = sql("INSERT INTO TABLE t PARTITION(d) SELECT * FROM VALUES(1,'a','b')")
.queryExecution.analyzed
assert(p2.collect {
case sort: Sort
if (!sort.global && Seq("c1", "c2", "d").forall(x =>
sort.references.map(_.name).exists(_.equals(x)))) &&
((sort.order.exists(_.child.isInstanceOf[Zorder]) && zorder) ||
(!sort.order.exists(_.child.isInstanceOf[Zorder]) && !zorder)) => sort
}.size == 1)
assert(p2.collect {
case rebalance: RebalancePartitions
if Seq("c1", "c2", "d").forall(x =>
rebalance.references.map(_.name).exists(_.equals(x))) => rebalance
}.size == 1)
}
}
}
}
test("Two phase rebalance before Z-Order") {
withSQLConf(
SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
"org.apache.spark.sql.catalyst.optimizer.CollapseRepartition",
KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED.key -> "false",
KyuubiSQLConf.REBALANCE_BEFORE_ZORDER.key -> "true",
KyuubiSQLConf.TWO_PHASE_REBALANCE_BEFORE_ZORDER.key -> "true",
KyuubiSQLConf.REBALANCE_ZORDER_COLUMNS_ENABLED.key -> "true") {
withTable("t") {
sql(
"""
|CREATE TABLE t (c1 int) PARTITIONED BY (d string)
| TBLPROPERTIES (
|'kyuubi.zorder.enabled'= 'true',
|'kyuubi.zorder.cols'= 'c1')
|""".stripMargin)
val p = sql("INSERT INTO TABLE t PARTITION(d) SELECT * FROM VALUES(1,'a')")
val rebalance = p.queryExecution.optimizedPlan.innerChildren
.flatMap(_.collect { case r: RebalancePartitions => r })
assert(rebalance.size == 2)
assert(rebalance.head.partitionExpressions.flatMap(_.references.map(_.name))
.contains("d"))
assert(rebalance.head.partitionExpressions.flatMap(_.references.map(_.name))
.contains("c1"))
assert(rebalance(1).partitionExpressions.flatMap(_.references.map(_.name))
.contains("d"))
assert(!rebalance(1).partitionExpressions.flatMap(_.references.map(_.name))
.contains("c1"))
}
}
}
}
class ZorderWithCodegenEnabledSuite extends ZorderWithCodegenEnabledSuiteBase33 {}
class ZorderWithCodegenDisabledSuite extends ZorderWithCodegenEnabledSuiteBase33 {}

View File

@ -97,6 +97,44 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(true)
val REBALANCE_BEFORE_ZORDER =
buildConf("spark.sql.optimizer.rebalanceBeforeZorder.enabled")
.doc("when true, we do a rebalance before zorder in case data skew. " +
"Note that, if the insertion is dynamic partition we will use the partition " +
"columns to rebalance. Note that, this config only affects with Spark 3.3.x")
.version("1.6.0")
.booleanConf
.createWithDefault(false)
val REBALANCE_ZORDER_COLUMNS_ENABLED =
buildConf("spark.sql.optimizer.rebalanceZorderColumns.enabled")
.doc(s"When true and ${REBALANCE_BEFORE_ZORDER.key} is true, we do rebalance before " +
s"Z-Order. If it's dynamic partition insert, the rebalance expression will include " +
s"both partition columns and Z-Order columns. Note that, this config only " +
s"affects with Spark 3.3.x")
.version("1.6.0")
.booleanConf
.createWithDefault(false)
val TWO_PHASE_REBALANCE_BEFORE_ZORDER =
buildConf("spark.sql.optimizer.twoPhaseRebalanceBeforeZorder.enabled")
.doc(s"When true and ${REBALANCE_BEFORE_ZORDER.key} is true, we do two phase rebalance " +
s"before Z-Order for the dynamic partition write. The first phase rebalance using " +
s"dynamic partition column; The second phase rebalance using dynamic partition column + " +
s"Z-Order columns. Note that, this config only affects with Spark 3.3.x")
.version("1.6.0")
.booleanConf
.createWithDefault(false)
val ZORDER_USING_ORIGINAL_ORDERING_ENABLED =
buildConf("spark.sql.optimizer.zorderUsingOriginalOrdering.enabled")
.doc(s"When true and ${REBALANCE_BEFORE_ZORDER.key} is true, we do sort by " +
s"the original ordering i.e. lexicographical order. Note that, this config only " +
s"affects with Spark 3.3.x")
.version("1.6.0")
.booleanConf
.createWithDefault(false)
val WATCHDOG_MAX_PARTITIONS =
buildConf("spark.sql.watchdog.maxPartitions")
.doc("Set the max partition number when spark scans a data source. " +