From 43f40dcb8af8db2c1dc17ae366121e182ec8c2e4 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Thu, 13 May 2021 16:03:32 +0800 Subject: [PATCH] [KYUUBI #631] Add kyuubi-spark-extensions module ### _Why are the changes needed?_ The added sql module structure looks like: ``` kyuubi | - dev | - kyuubi-extension-spark_3.1 ``` This PR mainly add 3 feature: * merging small files automatically (include dynamic partition insertion case) * insert shuffle node before Join to make AQE `OptimizeSkewedJoin` work * stage level config isolation in AQE Note that, the sql rule depend on the Apache Spark interface so we need make the sql module verion independence. Currently, this PR only supports the Spark 3.1.1. Due to the version issue, we need to check and deploy this extension manually currently. ### _How was this patch tested?_ Add new test. Closes #631 from ulysses-you/add-sql-module. Closes #631 2cf12f1 [ulysses-you] version cfbf72c [ulysses-you] address comment 7740ca6 [ulysses-you] module name 0f723eb [ulysses-you] workflow 45c23d8 [ulysses-you] line 80378f5 [ulysses-you] assembly 95528aa [ulysses-you] move module 5fe5d87 [ulysses-you] license 6578440 [ulysses-you] init work Authored-by: ulysses-you Signed-off-by: Kent Yao --- .github/workflows/master.yml | 4 +- dev/kyuubi-extension-spark_3.1/pom.xml | 106 +++++ .../apache/kyuubi/sql/KyuubiAnalysis.scala | 172 +++++++++ .../kyuubi/sql/KyuubiEnsureRequirements.scala | 131 +++++++ .../sql/KyuubiQueryStagePreparation.scala | 241 ++++++++++++ .../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 70 ++++ .../kyuubi/sql/KyuubiSparkSQLExtension.scala | 37 ++ .../spark/sql/KyuubiExtensionSuite.scala | 364 ++++++++++++++++++ kyuubi-assembly/pom.xml | 14 +- pom.xml | 10 + 10 files changed, 1146 insertions(+), 3 deletions(-) create mode 100644 dev/kyuubi-extension-spark_3.1/pom.xml create mode 100644 dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiAnalysis.scala create mode 100644 dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiEnsureRequirements.scala create mode 100644 dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiQueryStagePreparation.scala create mode 100644 dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala create mode 100644 dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala create mode 100644 dev/kyuubi-extension-spark_3.1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index a0198a40b..36ceafe50 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -35,8 +35,8 @@ jobs: profiles: - '' - '-Pspark-3.0 -Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.1 -Dspark.archive.name=spark-3.1.1-bin-hadoop2.7.tgz -Dmaven.plugin.scalatest.exclude.tags=org.apache.kyuubi.tags.ExtendedSQLTest,org.apache.kyuubi.tags.DataLakeTest' - - '-Pspark-3.1' - - '-Pspark-3.1 -Pspark-hadoop-3.2' + - '-Pspark-3.1 -Pkyuubi-sql-spark_3.1' + - '-Pspark-3.1 -Pkyuubi-sql-spark_3.1 -Pspark-hadoop-3.2' - '-Pspark-3.2-snapshot -pl :kyuubi-spark-sql-engine,:kyuubi-common,:kyuubi-ha,:kyuubi-zookeeper -Dmaven.plugin.scalatest.exclude.tags=org.apache.kyuubi.tags.ExtendedSQLTest,org.apache.kyuubi.tags.DataLakeTest' - '-DwildcardSuites=org.apache.kyuubi.operation.tpcds.TPCDSOutputSchemaSuite,org.apache.kyuubi.operation.tpcds.TPCDSDDLSuite -Dmaven.plugin.scalatest.exclude.tags=""' env: diff --git a/dev/kyuubi-extension-spark_3.1/pom.xml b/dev/kyuubi-extension-spark_3.1/pom.xml new file mode 100644 index 000000000..232e729d3 --- /dev/null +++ b/dev/kyuubi-extension-spark_3.1/pom.xml @@ -0,0 +1,106 @@ + + + + + + kyuubi + org.apache.kyuubi + 1.2.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + kyuubi-extension-spark_3.1 + jar + Kyuubi Project Dev Spark Extensions + + + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + provided + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + provided + + + + org.apache.hadoop + hadoop-client-runtime + provided + + + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + test-jar + test + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + test-jar + test + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + test-jar + test + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + test-jar + test + + + + org.scalatest + scalatest_${scala.binary.version} + ${scalatest.version} + test + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + diff --git a/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiAnalysis.scala b/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiAnalysis.scala new file mode 100644 index 000000000..c9a1d0bb2 --- /dev/null +++ b/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiAnalysis.scala @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql + +import java.util.Random + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal, Multiply, Rand} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand +import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand +import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable} +import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.sql.types.IntegerType + +import org.apache.kyuubi.sql.RepartitionBeforeWriteHelper._ + +/** + * For datasource table, there two commands can write data to table + * 1. InsertIntoHadoopFsRelationCommand + * 2. CreateDataSourceTableAsSelectCommand + * This rule add a repartition node between write and query + */ +case class RepartitionBeforeWrite(session: SparkSession) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + if (conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE)) { + addRepartition(plan) + } else { + plan + } + } + + private def addRepartition(plan: LogicalPlan): LogicalPlan = plan match { + case i @ InsertIntoHadoopFsRelationCommand(_, sp, _, pc, bucket, _, _, query, _, _, _, _) + if query.resolved && bucket.isEmpty && canInsertRepartitionByExpression(query) => + val dynamicPartitionColumns = pc.filterNot(attr => sp.contains(attr.name)) + if (dynamicPartitionColumns.isEmpty) { + i.copy(query = + RepartitionByExpression( + Seq.empty, + query, + conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))) + } else { + val extended = dynamicPartitionColumns ++ dynamicPartitionExtraExpression( + conf.getConf(KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM)) + i.copy(query = + RepartitionByExpression( + extended, + query, + conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))) + } + + case c @ CreateDataSourceTableAsSelectCommand(table, _, query, _) + if query.resolved && table.bucketSpec.isEmpty && canInsertRepartitionByExpression(query) => + val dynamicPartitionColumns = + query.output.filter(attr => table.partitionColumnNames.contains(attr.name)) + if (dynamicPartitionColumns.isEmpty) { + c.copy(query = + RepartitionByExpression( + Seq.empty, + query, + conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))) + } else { + val extended = dynamicPartitionColumns ++ dynamicPartitionExtraExpression( + conf.getConf(KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM)) + c.copy(query = + RepartitionByExpression( + extended, + query, + conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))) + } + + case _ => plan + } +} + +/** + * For Hive table, there two commands can write data to table + * 1. InsertIntoHiveTable + * 2. CreateHiveTableAsSelectCommand + * This rule add a repartition node between write and query + */ +case class RepartitionBeforeWriteHive(session: SparkSession) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + if (conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive" && + conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE)) { + addRepartition(plan) + } else { + plan + } + } + + def addRepartition(plan: LogicalPlan): LogicalPlan = plan match { + case i @ InsertIntoHiveTable(table, partition, query, _, _, _) + if query.resolved && table.bucketSpec.isEmpty && canInsertRepartitionByExpression(query) => + val dynamicPartitionColumns = partition.filter(_._2.isEmpty).keys + .flatMap(name => query.output.find(_.name == name)).toSeq + if (dynamicPartitionColumns.isEmpty) { + i.copy(query = + RepartitionByExpression( + Seq.empty, + query, + conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))) + } else { + val extended = dynamicPartitionColumns ++ dynamicPartitionExtraExpression( + conf.getConf(KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM)) + i.copy(query = + RepartitionByExpression( + extended, + query, + conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))) + } + + case c @ CreateHiveTableAsSelectCommand(table, query, _, _) + if query.resolved && table.bucketSpec.isEmpty && canInsertRepartitionByExpression(query) => + val dynamicPartitionColumns = + query.output.filter(attr => table.partitionColumnNames.contains(attr.name)) + if (dynamicPartitionColumns.isEmpty) { + c.copy(query = + RepartitionByExpression( + Seq.empty, + query, + conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))) + } else { + val extended = dynamicPartitionColumns ++ dynamicPartitionExtraExpression( + conf.getConf(KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM)) + c.copy(query = + RepartitionByExpression( + extended, + query, + conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))) + } + + case _ => plan + } +} + +object RepartitionBeforeWriteHelper { + def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = plan match { + case Limit(_, _) => false + case _: Sort => false + case _: RepartitionByExpression => false + case _: Repartition => false + case _ => true + } + + def dynamicPartitionExtraExpression(partitionNumber: Int): Seq[Expression] = { + // Dynamic partition insertion will add repartition by partition column, but it could cause + // data skew (one partition value has large data). So we add extra partition column for the + // same dynamic partition to avoid skew. + Cast(Multiply( + new Rand(Literal(new Random().nextLong())), + Literal(partitionNumber.toDouble) + ), IntegerType) :: Nil + } +} diff --git a/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiEnsureRequirements.scala b/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiEnsureRequirements.scala new file mode 100644 index 000000000..8f8942cb1 --- /dev/null +++ b/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiEnsureRequirements.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql + +import org.apache.spark.sql.catalyst.expressions.SortOrder +import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{SortExec, SparkPlan} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} + +/** + * Copy from Apache Spark `EnsureRequirements` + * 1. remove reorder join predicates + * 2. remove shuffle pruning + */ +object KyuubiEnsureRequirements extends Rule[SparkPlan] { + private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { + val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution + val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering + var children: Seq[SparkPlan] = operator.children + assert(requiredChildDistributions.length == children.length) + assert(requiredChildOrderings.length == children.length) + + // Ensure that the operator's children satisfy their output distribution requirements. + children = children.zip(requiredChildDistributions).map { + case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + child + case (child, BroadcastDistribution(mode)) => + BroadcastExchangeExec(mode, child) + case (child, distribution) => + val numPartitions = distribution.requiredNumPartitions + .getOrElse(conf.numShufflePartitions) + ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) + } + + // Get the indexes of children which have specified distribution requirements and need to have + // same number of partitions. + val childrenIndexes = requiredChildDistributions.zipWithIndex.filter { + case (UnspecifiedDistribution, _) => false + case (_: BroadcastDistribution, _) => false + case _ => true + }.map(_._2) + + val childrenNumPartitions = + childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet + + if (childrenNumPartitions.size > 1) { + // Get the number of partitions which is explicitly required by the distributions. + val requiredNumPartitions = { + val numPartitionsSet = childrenIndexes.flatMap { + index => requiredChildDistributions(index).requiredNumPartitions + }.toSet + assert(numPartitionsSet.size <= 1, + s"$operator have incompatible requirements of the number of partitions for its children") + numPartitionsSet.headOption + } + + // If there are non-shuffle children that satisfy the required distribution, we have + // some tradeoffs when picking the expected number of shuffle partitions: + // 1. We should avoid shuffling these children. + // 2. We should have a reasonable parallelism. + val nonShuffleChildrenNumPartitions = + childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec]) + .map(_.outputPartitioning.numPartitions) + val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) { + if (nonShuffleChildrenNumPartitions.length == childrenIndexes.length) { + // Here we pick the max number of partitions among these non-shuffle children. + nonShuffleChildrenNumPartitions.max + } else { + // Here we pick the max number of partitions among these non-shuffle children as the + // expected number of shuffle partitions. However, if it's smaller than + // `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the + // expected number of shuffle partitions. + math.max(nonShuffleChildrenNumPartitions.max, conf.defaultNumShufflePartitions) + } + } else { + childrenNumPartitions.max + } + + val targetNumPartitions = requiredNumPartitions.getOrElse(expectedChildrenNumPartitions) + + children = children.zip(requiredChildDistributions).zipWithIndex.map { + case ((child, distribution), index) if childrenIndexes.contains(index) => + if (child.outputPartitioning.numPartitions == targetNumPartitions) { + child + } else { + val defaultPartitioning = distribution.createPartitioning(targetNumPartitions) + child match { + // If child is an exchange, we replace it with a new one having defaultPartitioning. + case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c) + case _ => ShuffleExchangeExec(defaultPartitioning, child) + } + } + + case ((child, _), _) => child + } + } + + // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings: + children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) => + // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort. + if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) { + child + } else { + SortExec(requiredOrdering, global = false, child = child) + } + } + + operator.withNewChildren(children) + } + + def apply(plan: SparkPlan): SparkPlan = plan.transformUp { + case operator: SparkPlan => + ensureDistributionAndOrdering(operator) + } +} diff --git a/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiQueryStagePreparation.scala b/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiQueryStagePreparation.scala new file mode 100644 index 000000000..6c48a3891 --- /dev/null +++ b/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiQueryStagePreparation.scala @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.physical.Distribution +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{SortExec, SparkPlan} +import org.apache.spark.sql.execution.adaptive.QueryStageExec +import org.apache.spark.sql.execution.aggregate.BaseAggregateExec +import org.apache.spark.sql.execution.command.{ResetCommand, SetCommand} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, Exchange, ShuffleExchangeExec, ShuffleExchangeLike} +import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.internal.SQLConf + +import org.apache.kyuubi.sql.KyuubiSQLConf._ + +/** + * Insert shuffle node before join if it doesn't exist to make `OptimizeSkewedJoin` works. + */ +object InsertShuffleNodeBeforeJoin extends Rule[SparkPlan] { + + override def apply(plan: SparkPlan): SparkPlan = { + // this rule has no meaning without AQE + if (!conf.getConf(FORCE_SHUFFLE_BEFORE_JOIN) || + !conf.getConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED)) { + return plan + } + + val newPlan = insertShuffleBeforeJoin(plan) + if (plan.fastEquals(newPlan)) { + plan + } else { + // make sure the output partitioning and ordering will not be broken. + KyuubiEnsureRequirements.apply(newPlan) + } + } + + private def insertShuffleBeforeJoin(plan: SparkPlan): SparkPlan = plan transformUp { + case smj @ SortMergeJoinExec(_, _, _, _, l, r, _) => + smj.withNewChildren(checkAndInsertShuffle(smj.requiredChildDistribution.head, l) :: + checkAndInsertShuffle(smj.requiredChildDistribution(1), r) :: Nil) + + case shj: ShuffledHashJoinExec => + if (!shj.left.isInstanceOf[Exchange] && !shj.right.isInstanceOf[Exchange]) { + shj.withNewChildren(withShuffleExec(shj.requiredChildDistribution.head, shj.left) :: + withShuffleExec(shj.requiredChildDistribution(1), shj.right) :: Nil) + } else if (!shj.left.isInstanceOf[Exchange]) { + shj.withNewChildren( + withShuffleExec(shj.requiredChildDistribution.head, shj.left) :: shj.right :: Nil) + } else if (!shj.right.isInstanceOf[Exchange]) { + shj.withNewChildren( + shj.left :: withShuffleExec(shj.requiredChildDistribution(1), shj.right) :: Nil) + } else { + shj + } + } + + private def checkAndInsertShuffle( + distribution: Distribution, child: SparkPlan): SparkPlan = child match { + case SortExec(_, _, _: Exchange, _) => + child + case SortExec(_, _, _: QueryStageExec, _) => + child + case sort @ SortExec(_, _, agg: BaseAggregateExec, _) => + sort.withNewChildren(withShuffleExec(distribution, agg) :: Nil) + case _ => + withShuffleExec(distribution, child) + } + + private def withShuffleExec(distribution: Distribution, child: SparkPlan): SparkPlan = { + val numPartitions = distribution.requiredNumPartitions + .getOrElse(conf.numShufflePartitions) + ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) + } +} + + +/** + * This rule split stage into two parts: + * 1. previous stage + * 2. final stage + * For final stage, we can inject extra config. It's useful if we use repartition to optimize + * small files that needs bigger shuffle partition size than previous. + * + * Let's say we have a query with 3 stages, then the logical machine like: + * + * Set/Reset Command -> cleanup previousStage config if user set the spark config. + * Query -> AQE -> stage1 -> preparation (use previousStage to overwrite spark config) + * -> AQE -> stage2 -> preparation (use spark config) + * -> AQE -> stage3 -> preparation (use finalStage config to overwrite spark config, + * store spark config to previousStage.) + * + * An example of the new finalStage config: + * `spark.sql.adaptive.advisoryPartitionSizeInBytes` -> + * `spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes` + */ +case class FinalStageConfigIsolation(session: SparkSession) extends Rule[SparkPlan] { + import FinalStageConfigIsolation._ + + override def apply(plan: SparkPlan): SparkPlan = { + // this rule has no meaning without AQE + if (!conf.getConf(FINAL_STAGE_CONFIG_ISOLATION) || + !conf.getConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED)) { + return plan + } + + if (isFinalStage(plan)) { + // set config for final stage + session.conf.getAll.filter(_._1.startsWith(FINAL_STAGE_CONFIG_PREFIX)).foreach { + case (k, v) => + val sparkConfigKey = s"spark.sql.${k.substring(FINAL_STAGE_CONFIG_PREFIX.length)}" + val previousStageConfigKey = + s"$PREVIOUS_STAGE_CONFIG_PREFIX${k.substring(FINAL_STAGE_CONFIG_PREFIX.length)}" + // store the previous config only if we have not stored, to avoid some query only + // have one stage that will overwrite real config. + if (!session.sessionState.conf.contains(previousStageConfigKey)) { + val originalValue = if (session.conf.getOption(sparkConfigKey).isDefined) { + session.sessionState.conf.getConfString(sparkConfigKey) + } else { + // the default value of config is None, so we need to use a internal tag + INTERNAL_UNSET_CONFIG_TAG + } + logInfo(s"Store config: $sparkConfigKey to previousStage, " + + s"original value: $originalValue ") + session.sessionState.conf.setConfString(previousStageConfigKey, originalValue) + } + logInfo(s"For final stage: set $sparkConfigKey = $v.") + session.conf.set(sparkConfigKey, v) + } + } else { + // reset config for previous stage + session.conf.getAll.filter(_._1.startsWith(PREVIOUS_STAGE_CONFIG_PREFIX)).foreach { + case (k, v) => + val sparkConfigKey = s"spark.sql.${k.substring(PREVIOUS_STAGE_CONFIG_PREFIX.length)}" + logInfo(s"For previous stage: set $sparkConfigKey = $v.") + if (v == INTERNAL_UNSET_CONFIG_TAG) { + session.conf.unset(sparkConfigKey) + } else { + session.conf.set(sparkConfigKey, v) + } + // unset config so that we do not need to reset configs for every previous stage + session.conf.unset(k) + } + } + + plan + } + + /** + * Currently formula depend on AQE in Spark 3.1.1, not sure it can work in future. + */ + private def isFinalStage(plan: SparkPlan): Boolean = { + var shuffleNum = 0 + var broadcastNum = 0 + var queryStageNum = 0 + + def collectNumber(p: SparkPlan): SparkPlan = { + p transform { + case shuffle: ShuffleExchangeLike => + shuffleNum += 1 + shuffle + + case broadcast: BroadcastExchangeLike => + broadcastNum += 1 + broadcast + + // query stage is leaf node so we need to transform it manually + case queryStage: QueryStageExec => + queryStageNum += 1 + collectNumber(queryStage.plan) + queryStage + } + } + collectNumber(plan) + + if (shuffleNum == 0) { + // we don not care about broadcast stage here since it won't change partition number. + true + } else if (shuffleNum + broadcastNum == queryStageNum) { + true + } else { + false + } + } +} +object FinalStageConfigIsolation { + final val SQL_PREFIX = "spark.sql." + final val FINAL_STAGE_CONFIG_PREFIX = "spark.sql.finalStage." + final val PREVIOUS_STAGE_CONFIG_PREFIX = "spark.sql.previousStage." + final val INTERNAL_UNSET_CONFIG_TAG = "__INTERNAL_UNSET_CONFIG_TAG__" + + def getPreviousStageConfigKey(configKey: String): Option[String] = { + if (configKey.startsWith(SQL_PREFIX)) { + Some(s"$PREVIOUS_STAGE_CONFIG_PREFIX${configKey.substring(SQL_PREFIX.length)}") + } else { + None + } + } +} + +case class FinalStageConfigIsolationCleanRule(session: SparkSession) extends Rule[LogicalPlan] { + import FinalStageConfigIsolation._ + + override def apply(plan: LogicalPlan): LogicalPlan = plan match { + case set @ SetCommand(Some((k, Some(_)))) if k.startsWith(SQL_PREFIX) => + checkAndUnsetPreviousStageConfig(k) + set + + case reset @ ResetCommand(Some(k)) if k.startsWith(SQL_PREFIX) => + checkAndUnsetPreviousStageConfig(k) + reset + + case other => other + } + + private def checkAndUnsetPreviousStageConfig(configKey: String): Unit = { + getPreviousStageConfigKey(configKey).foreach { previousStageConfigKey => + if (session.sessionState.conf.contains(previousStageConfigKey)) { + logInfo(s"For previous stage: unset $previousStageConfigKey") + session.conf.unset(previousStageConfigKey) + } + } + } +} diff --git a/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala new file mode 100644 index 000000000..099be65e8 --- /dev/null +++ b/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql + +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + +object KyuubiSQLConf { + + val INSERT_REPARTITION_BEFORE_WRITE = + buildConf("spark.sql.optimizer.insertRepartitionBeforeWrite.enabled") + .doc("Add repartition node at the top of plan. A approach of merging small files.") + .version("1.2.0") + .booleanConf + .createWithDefault(true) + + val INSERT_REPARTITION_NUM = + buildConf("spark.sql.optimizer.insertRepartitionNum") + .doc(s"The partition number if ${INSERT_REPARTITION_BEFORE_WRITE.key} is enabled. " + + s"If AQE is disabled, the default value is ${SQLConf.SHUFFLE_PARTITIONS}. " + + s"If AQE is enabled, the default value is none that means depend on AQE.") + .version("1.2.0") + .intConf + .createOptional + + val DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM = + buildConf("spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum") + .doc(s"The partition number of each dynamic partition if " + + s"${INSERT_REPARTITION_BEFORE_WRITE.key} is enabled. " + + s"We will repartition by dynamic partition columns to reduce the small file but that " + + s"can cause data skew. This config is to extend the partition of dynamic " + + s"partition column to avoid skew but may generate some small files.") + .version("1.2.0") + .intConf + .createWithDefault(100) + + val FORCE_SHUFFLE_BEFORE_JOIN = + buildConf("spark.sql.optimizer.forceShuffleBeforeJoin.enabled") + .doc("Ensure shuffle node exists before shuffled join (shj and smj) to make AQE " + + "`OptimizeSkewedJoin` works (extra shuffle, multi table join).") + .version("1.2.0") + .booleanConf + .createWithDefault(false) + + val FINAL_STAGE_CONFIG_ISOLATION = + buildConf("spark.sql.optimizer.finalStageConfigIsolation.enabled") + .doc("If true, the final stage support use different config with previous stage. The final " + + "stage config key prefix should be `spark.sql.finalStage.`." + + "For example, the raw spark config: `spark.sql.adaptive.advisoryPartitionSizeInBytes`, " + + "then the final stage config should be: " + + "`spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`.") + .version("1.2.0") + .booleanConf + .createWithDefault(false) +} diff --git a/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala new file mode 100644 index 000000000..bcd2afd7b --- /dev/null +++ b/dev/kyuubi-extension-spark_3.1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql + +import org.apache.spark.sql.SparkSessionExtensions + +// scalastyle:off line.size.limit +/** + * Depend on Spark SQL Extension framework, we can use this extension follow steps + * 1. move this jar into $SPARK_HOME/jars + * 2. add config into `spark-defaults.conf`: `spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension` + */ +// scalastyle:on line.size.limit +class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) { + override def apply(extensions: SparkSessionExtensions): Unit = { + extensions.injectPostHocResolutionRule(RepartitionBeforeWrite) + extensions.injectPostHocResolutionRule(RepartitionBeforeWriteHive) + extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule) + extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin) + extensions.injectQueryStagePrepRule(FinalStageConfigIsolation(_)) + } +} diff --git a/dev/kyuubi-extension-spark_3.1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala b/dev/kyuubi-extension-spark_3.1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala new file mode 100644 index 000000000..3a128e8f1 --- /dev/null +++ b/dev/kyuubi-extension-spark_3.1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Multiply} +import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, CustomShuffleReaderExec} +import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike} +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.test.SQLTestData.TestData +import org.apache.spark.sql.test.SQLTestUtils + +import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf} + +class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSparkPlanHelper { + + var _spark: SparkSession = _ + override def spark: SparkSession = _spark + + protected override def beforeAll(): Unit = { + _spark = SparkSession.builder() + .master("local[1]") + .config(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, + "org.apache.kyuubi.sql.KyuubiSparkSQLExtension") + .config(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") + .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") + .config("spark.ui.enabled", "false") + .enableHiveSupport() + .getOrCreate() + setupData() + super.beforeAll() + } + + protected override def afterAll(): Unit = { + super.afterAll() + cleanupData() + if (_spark != null) { + _spark.stop() + } + } + + private def setupData(): Unit = { + val self = _spark + import self.implicits._ + spark.sparkContext.parallelize( + (1 to 100).map(i => TestData(i, i.toString)), 10) + .toDF("c1", "c2").createOrReplaceTempView("t1") + spark.sparkContext.parallelize( + (1 to 10).map(i => TestData(i, i.toString)), 5) + .toDF("c1", "c2").createOrReplaceTempView("t2") + spark.sparkContext.parallelize( + (1 to 50).map(i => TestData(i, i.toString)), 2) + .toDF("c1", "c2").createOrReplaceTempView("t3") + } + + private def cleanupData(): Unit = { + spark.sql("DROP VIEW IF EXISTS t1") + spark.sql("DROP VIEW IF EXISTS t2") + spark.sql("DROP VIEW IF EXISTS t3") + } + + test("check repartition exists") { + def check(df: DataFrame): Unit = { + assert( + df.queryExecution.analyzed.collect { + case r: RepartitionByExpression => + assert(r.optNumPartitions === + spark.sessionState.conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)) + r + }.size == 1 + ) + } + + // It's better to set config explicitly in case of we change the default value. + withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") { + Seq("USING PARQUET", "").foreach { storage => + withTable("tmp1") { + sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)") + check(sql("INSERT INTO TABLE tmp1 PARTITION(c2='a') " + + "SELECT * FROM VALUES(1),(2) AS t(c1)")) + } + + withTable("tmp1") { + sql(s"CREATE TABLE tmp1 (c1 int) $storage") + check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)")) + check(sql("INSERT INTO TABLE tmp1 " + + "SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1")) + } + + withTable("tmp1") { + sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) AS t(c1)") + } + + withTable("tmp1") { + sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " + + s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)") + } + } + } + } + + test("check repartition does not exists") { + def check(df: DataFrame): Unit = { + assert( + df.queryExecution.analyzed.collect { + case r: RepartitionByExpression => r + }.isEmpty + ) + } + + withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") { + // test no write command + check(sql("SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")) + check(sql("SELECT count(*) FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")) + + // test not supported plan + withTable("tmp1") { + sql(s"CREATE TABLE tmp1 (c1 int) PARTITIONED BY (c2 string)") + check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " + + "SELECT /*+ repartition(10) */ * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")) + check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " + + "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) ORDER BY c1")) + check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " + + "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) LIMIT 10")) + } + } + + withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "false") { + Seq("USING PARQUET", "").foreach { storage => + withTable("tmp1") { + sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)") + check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " + + "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")) + } + + withTable("tmp1") { + sql(s"CREATE TABLE tmp1 (c1 int) $storage") + check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)")) + } + + withTable("tmp1") { + sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) AS t(c1)") + } + + withTable("tmp1") { + sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " + + s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)") + } + } + } + } + + test("test dynamic partition write") { + def checkRepartitionExpression(df: DataFrame): Unit = { + assert(df.queryExecution.analyzed.collect { + case r: RepartitionByExpression if r.partitionExpressions.size == 2 => + assert(r.partitionExpressions.head.asInstanceOf[Attribute].name === "c2") + assert(r.partitionExpressions(1).asInstanceOf[Cast].child.asInstanceOf[Multiply] + .left.asInstanceOf[Attribute].name.startsWith("_nondeterministic")) + r + }.size == 1) + } + + withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true", + KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM.key -> "2") { + Seq("USING PARQUET", "").foreach { storage => + withTable("tmp1") { + sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)") + checkRepartitionExpression(sql("INSERT INTO TABLE tmp1 SELECT 1 as c1, 'a' as c2 ")) + } + + withTable("tmp1") { + checkRepartitionExpression( + sql("CREATE TABLE tmp1 PARTITIONED BY(C2) SELECT 1 as c1, 'a' as c2 ")) + } + } + } + } + + test("force shuffle before join") { + def checkShuffleNodeNum(sqlString: String, num: Int): Unit = { + var expectedResult: Seq[Row] = Seq.empty + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + expectedResult = sql(sqlString).collect() + } + val df = sql(sqlString) + checkAnswer(df, expectedResult) + assert( + collect(df.queryExecution.executedPlan) { + case shuffle: ShuffleExchangeLike + if shuffle.shuffleOrigin == ENSURE_REQUIREMENTS => shuffle + }.size == num) + } + + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + KyuubiSQLConf.FORCE_SHUFFLE_BEFORE_JOIN.key -> "true") { + Seq("SHUFFLE_HASH", "MERGE").foreach { joinHint => + // positive case + checkShuffleNodeNum( + s""" + |SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from t1 + | JOIN t2 ON t1.c1 = t2.c1 + | JOIN t3 ON t1.c1 = t3.c1 + | """.stripMargin, 4) + + // negative case + checkShuffleNodeNum( + s""" + |SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from t1 + | JOIN t2 ON t1.c1 = t2.c1 + | JOIN t3 ON t1.c2 = t3.c2 + | """.stripMargin, 4) + } + + checkShuffleNodeNum( + """ + |SELECT t1.c1, t2.c1, t3.c2 from t1 + | JOIN t2 ON t1.c1 = t2.c1 + | JOIN ( + | SELECT c2, count(*) FROM t1 GROUP BY c2 + | ) t3 ON t1.c1 = t3.c2 + | """.stripMargin, 5) + + checkShuffleNodeNum( + """ + |SELECT t1.c1, t2.c1, t3.c1 from t1 + | JOIN t2 ON t1.c1 = t2.c1 + | JOIN ( + | SELECT c1, count(*) FROM t1 GROUP BY c1 + | ) t3 ON t1.c1 = t3.c1 + | """.stripMargin, 5) + } + } + + test("final stage config set reset check") { + withSQLConf(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true", + "spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum" -> "1", + "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" -> "100") { + // use loop to double check final stage config doesn't affect the sql query each other + (1 to 3).foreach { _ => + sql("SELECT COUNT(*) FROM VALUES(1) as t(c)").collect() + assert(spark.sessionState.conf.getConfString( + "spark.sql.previousStage.adaptive.coalescePartitions.minPartitionNum") === + FinalStageConfigIsolation.INTERNAL_UNSET_CONFIG_TAG) + assert(spark.sessionState.conf.getConfString( + "spark.sql.adaptive.coalescePartitions.minPartitionNum") === + "1") + assert(spark.sessionState.conf.getConfString( + "spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum") === + "1") + + // 64MB + assert(spark.sessionState.conf.getConfString( + "spark.sql.previousStage.adaptive.advisoryPartitionSizeInBytes") === + "67108864b") + assert(spark.sessionState.conf.getConfString( + "spark.sql.adaptive.advisoryPartitionSizeInBytes") === + "100") + assert(spark.sessionState.conf.getConfString( + "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes") === + "100") + } + + sql("SET spark.sql.adaptive.advisoryPartitionSizeInBytes=1") + assert(spark.sessionState.conf.getConfString( + "spark.sql.adaptive.advisoryPartitionSizeInBytes") === + "1") + assert(!spark.sessionState.conf.contains( + "spark.sql.previousStage.adaptive.advisoryPartitionSizeInBytes")) + + sql("SET a=1") + assert(spark.sessionState.conf.getConfString("a") === "1") + + sql("RESET spark.sql.adaptive.coalescePartitions.minPartitionNum") + assert(!spark.sessionState.conf.contains( + "spark.sql.adaptive.coalescePartitions.minPartitionNum")) + assert(!spark.sessionState.conf.contains( + "spark.sql.previousStage.adaptive.coalescePartitions.minPartitionNum")) + + sql("RESET a") + assert(!spark.sessionState.conf.contains("a")) + } + } + + test("final stage config isolation") { + def checkPartitionNum( + sqlString: String, previousPartitionNum: Int, finalPartitionNum: Int): Unit = { + val df = sql(sqlString) + df.collect() + val shuffleReaders = collect(df.queryExecution.executedPlan) { + case customShuffleReader: CustomShuffleReaderExec => customShuffleReader + } + assert(shuffleReaders.nonEmpty) + shuffleReaders.tail.foreach { readers => + assert(readers.partitionSpecs.length === previousPartitionNum) + } + assert(shuffleReaders.head.partitionSpecs.length === finalPartitionNum) + assert(df.rdd.partitions.length === finalPartitionNum) + } + + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", + SQLConf.SHUFFLE_PARTITIONS.key -> "3", + KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true", + "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "1", + "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" -> "10000000") { + + // use loop to double check final stage config doesn't affect the sql query each other + (1 to 3).foreach { _ => + checkPartitionNum( + "SELECT c1, count(*) FROM t1 GROUP BY c1", + 1, + 1) + + checkPartitionNum( + "SELECT c2, count(*) FROM (SELECT c1, count(*) as c2 FROM t1 GROUP BY c1) GROUP BY c2", + 3, + 1) + + checkPartitionNum( + "SELECT t1.c1, count(*) FROM t1 JOIN t2 ON t1.c2 = t2.c2 GROUP BY t1.c1", + 3, + 1) + + checkPartitionNum( + """ + | SELECT /*+ REPARTITION */ + | t1.c1, count(*) FROM t1 + | JOIN t2 ON t1.c2 = t2.c2 + | JOIN t3 ON t1.c1 = t3.c1 + | GROUP BY t1.c1 + |""".stripMargin, + 3, + 1) + + // one shuffle reader + checkPartitionNum( + """ + | SELECT /*+ BROADCAST(t1) */ + | t1.c1, t2.c2 FROM t1 + | JOIN t2 ON t1.c2 = t2.c2 + | DISTRIBUTE BY c1 + |""".stripMargin, + 1, + 1) + } + } + } +} diff --git a/kyuubi-assembly/pom.xml b/kyuubi-assembly/pom.xml index 1e7257edd..44b60fe5c 100644 --- a/kyuubi-assembly/pom.xml +++ b/kyuubi-assembly/pom.xml @@ -101,4 +101,16 @@ - \ No newline at end of file + + + kyuubi-extension-spark_3.1 + + + org.apache.kyuubi + kyuubi-extension-spark_3.1 + ${project.version} + + + + + diff --git a/pom.xml b/pom.xml index 78407a40a..2747fba9b 100644 --- a/pom.xml +++ b/pom.xml @@ -1584,5 +1584,15 @@ true + + + kyuubi-extension-spark_3.1 + + 3.1.1 + + + dev/kyuubi-extension-spark_3.1 + +