[KYUUBI #631] Add kyuubi-spark-extensions module
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/NetEase/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> The added sql module structure looks like: ``` kyuubi | - dev | - kyuubi-extension-spark_3.1 ``` This PR mainly add 3 feature: * merging small files automatically (include dynamic partition insertion case) * insert shuffle node before Join to make AQE `OptimizeSkewedJoin` work * stage level config isolation in AQE Note that, the sql rule depend on the Apache Spark interface so we need make the sql module verion independence. Currently, this PR only supports the Spark 3.1.1. Due to the version issue, we need to check and deploy this extension manually currently. ### _How was this patch tested?_ Add new test. Closes #631 from ulysses-you/add-sql-module. Closes #631 2cf12f1 [ulysses-you] version cfbf72c [ulysses-you] address comment 7740ca6 [ulysses-you] module name 0f723eb [ulysses-you] workflow 45c23d8 [ulysses-you] line 80378f5 [ulysses-you] assembly 95528aa [ulysses-you] move module 5fe5d87 [ulysses-you] license 6578440 [ulysses-you] init work Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
bb9e8a50e5
commit
43f40dcb8a
4
.github/workflows/master.yml
vendored
4
.github/workflows/master.yml
vendored
@ -35,8 +35,8 @@ jobs:
|
||||
profiles:
|
||||
- ''
|
||||
- '-Pspark-3.0 -Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.1 -Dspark.archive.name=spark-3.1.1-bin-hadoop2.7.tgz -Dmaven.plugin.scalatest.exclude.tags=org.apache.kyuubi.tags.ExtendedSQLTest,org.apache.kyuubi.tags.DataLakeTest'
|
||||
- '-Pspark-3.1'
|
||||
- '-Pspark-3.1 -Pspark-hadoop-3.2'
|
||||
- '-Pspark-3.1 -Pkyuubi-sql-spark_3.1'
|
||||
- '-Pspark-3.1 -Pkyuubi-sql-spark_3.1 -Pspark-hadoop-3.2'
|
||||
- '-Pspark-3.2-snapshot -pl :kyuubi-spark-sql-engine,:kyuubi-common,:kyuubi-ha,:kyuubi-zookeeper -Dmaven.plugin.scalatest.exclude.tags=org.apache.kyuubi.tags.ExtendedSQLTest,org.apache.kyuubi.tags.DataLakeTest'
|
||||
- '-DwildcardSuites=org.apache.kyuubi.operation.tpcds.TPCDSOutputSchemaSuite,org.apache.kyuubi.operation.tpcds.TPCDSDDLSuite -Dmaven.plugin.scalatest.exclude.tags=""'
|
||||
env:
|
||||
|
||||
106
dev/kyuubi-extension-spark_3.1/pom.xml
Normal file
106
dev/kyuubi-extension-spark_3.1/pom.xml
Normal file
@ -0,0 +1,106 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>kyuubi</artifactId>
|
||||
<groupId>org.apache.kyuubi</groupId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>kyuubi-extension-spark_3.1</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Kyuubi Project Dev Spark Extensions</name>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-hive_${scala.binary.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_${scala.binary.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client-runtime</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-hive_${scala.binary.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_${scala.binary.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_${scala.binary.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.scalatest</groupId>
|
||||
<artifactId>scalatest_${scala.binary.version}</artifactId>
|
||||
<version>${scalatest.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
|
||||
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
|
||||
</build>
|
||||
</project>
|
||||
@ -0,0 +1,172 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.sql
|
||||
|
||||
import java.util.Random
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal, Multiply, Rand}
|
||||
import org.apache.spark.sql.catalyst.plans.logical._
|
||||
import org.apache.spark.sql.catalyst.rules.Rule
|
||||
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
|
||||
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
|
||||
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
|
||||
import org.apache.spark.sql.internal.StaticSQLConf
|
||||
import org.apache.spark.sql.types.IntegerType
|
||||
|
||||
import org.apache.kyuubi.sql.RepartitionBeforeWriteHelper._
|
||||
|
||||
/**
|
||||
* For datasource table, there two commands can write data to table
|
||||
* 1. InsertIntoHadoopFsRelationCommand
|
||||
* 2. CreateDataSourceTableAsSelectCommand
|
||||
* This rule add a repartition node between write and query
|
||||
*/
|
||||
case class RepartitionBeforeWrite(session: SparkSession) extends Rule[LogicalPlan] {
|
||||
override def apply(plan: LogicalPlan): LogicalPlan = {
|
||||
if (conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE)) {
|
||||
addRepartition(plan)
|
||||
} else {
|
||||
plan
|
||||
}
|
||||
}
|
||||
|
||||
private def addRepartition(plan: LogicalPlan): LogicalPlan = plan match {
|
||||
case i @ InsertIntoHadoopFsRelationCommand(_, sp, _, pc, bucket, _, _, query, _, _, _, _)
|
||||
if query.resolved && bucket.isEmpty && canInsertRepartitionByExpression(query) =>
|
||||
val dynamicPartitionColumns = pc.filterNot(attr => sp.contains(attr.name))
|
||||
if (dynamicPartitionColumns.isEmpty) {
|
||||
i.copy(query =
|
||||
RepartitionByExpression(
|
||||
Seq.empty,
|
||||
query,
|
||||
conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)))
|
||||
} else {
|
||||
val extended = dynamicPartitionColumns ++ dynamicPartitionExtraExpression(
|
||||
conf.getConf(KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM))
|
||||
i.copy(query =
|
||||
RepartitionByExpression(
|
||||
extended,
|
||||
query,
|
||||
conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)))
|
||||
}
|
||||
|
||||
case c @ CreateDataSourceTableAsSelectCommand(table, _, query, _)
|
||||
if query.resolved && table.bucketSpec.isEmpty && canInsertRepartitionByExpression(query) =>
|
||||
val dynamicPartitionColumns =
|
||||
query.output.filter(attr => table.partitionColumnNames.contains(attr.name))
|
||||
if (dynamicPartitionColumns.isEmpty) {
|
||||
c.copy(query =
|
||||
RepartitionByExpression(
|
||||
Seq.empty,
|
||||
query,
|
||||
conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)))
|
||||
} else {
|
||||
val extended = dynamicPartitionColumns ++ dynamicPartitionExtraExpression(
|
||||
conf.getConf(KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM))
|
||||
c.copy(query =
|
||||
RepartitionByExpression(
|
||||
extended,
|
||||
query,
|
||||
conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)))
|
||||
}
|
||||
|
||||
case _ => plan
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For Hive table, there two commands can write data to table
|
||||
* 1. InsertIntoHiveTable
|
||||
* 2. CreateHiveTableAsSelectCommand
|
||||
* This rule add a repartition node between write and query
|
||||
*/
|
||||
case class RepartitionBeforeWriteHive(session: SparkSession) extends Rule[LogicalPlan] {
|
||||
override def apply(plan: LogicalPlan): LogicalPlan = {
|
||||
if (conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive" &&
|
||||
conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE)) {
|
||||
addRepartition(plan)
|
||||
} else {
|
||||
plan
|
||||
}
|
||||
}
|
||||
|
||||
def addRepartition(plan: LogicalPlan): LogicalPlan = plan match {
|
||||
case i @ InsertIntoHiveTable(table, partition, query, _, _, _)
|
||||
if query.resolved && table.bucketSpec.isEmpty && canInsertRepartitionByExpression(query) =>
|
||||
val dynamicPartitionColumns = partition.filter(_._2.isEmpty).keys
|
||||
.flatMap(name => query.output.find(_.name == name)).toSeq
|
||||
if (dynamicPartitionColumns.isEmpty) {
|
||||
i.copy(query =
|
||||
RepartitionByExpression(
|
||||
Seq.empty,
|
||||
query,
|
||||
conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)))
|
||||
} else {
|
||||
val extended = dynamicPartitionColumns ++ dynamicPartitionExtraExpression(
|
||||
conf.getConf(KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM))
|
||||
i.copy(query =
|
||||
RepartitionByExpression(
|
||||
extended,
|
||||
query,
|
||||
conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)))
|
||||
}
|
||||
|
||||
case c @ CreateHiveTableAsSelectCommand(table, query, _, _)
|
||||
if query.resolved && table.bucketSpec.isEmpty && canInsertRepartitionByExpression(query) =>
|
||||
val dynamicPartitionColumns =
|
||||
query.output.filter(attr => table.partitionColumnNames.contains(attr.name))
|
||||
if (dynamicPartitionColumns.isEmpty) {
|
||||
c.copy(query =
|
||||
RepartitionByExpression(
|
||||
Seq.empty,
|
||||
query,
|
||||
conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)))
|
||||
} else {
|
||||
val extended = dynamicPartitionColumns ++ dynamicPartitionExtraExpression(
|
||||
conf.getConf(KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM))
|
||||
c.copy(query =
|
||||
RepartitionByExpression(
|
||||
extended,
|
||||
query,
|
||||
conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)))
|
||||
}
|
||||
|
||||
case _ => plan
|
||||
}
|
||||
}
|
||||
|
||||
object RepartitionBeforeWriteHelper {
|
||||
def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = plan match {
|
||||
case Limit(_, _) => false
|
||||
case _: Sort => false
|
||||
case _: RepartitionByExpression => false
|
||||
case _: Repartition => false
|
||||
case _ => true
|
||||
}
|
||||
|
||||
def dynamicPartitionExtraExpression(partitionNumber: Int): Seq[Expression] = {
|
||||
// Dynamic partition insertion will add repartition by partition column, but it could cause
|
||||
// data skew (one partition value has large data). So we add extra partition column for the
|
||||
// same dynamic partition to avoid skew.
|
||||
Cast(Multiply(
|
||||
new Rand(Literal(new Random().nextLong())),
|
||||
Literal(partitionNumber.toDouble)
|
||||
), IntegerType) :: Nil
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,131 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.sql
|
||||
|
||||
import org.apache.spark.sql.catalyst.expressions.SortOrder
|
||||
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, UnspecifiedDistribution}
|
||||
import org.apache.spark.sql.catalyst.rules.Rule
|
||||
import org.apache.spark.sql.execution.{SortExec, SparkPlan}
|
||||
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
|
||||
|
||||
/**
|
||||
* Copy from Apache Spark `EnsureRequirements`
|
||||
* 1. remove reorder join predicates
|
||||
* 2. remove shuffle pruning
|
||||
*/
|
||||
object KyuubiEnsureRequirements extends Rule[SparkPlan] {
|
||||
private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
|
||||
val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
|
||||
val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
|
||||
var children: Seq[SparkPlan] = operator.children
|
||||
assert(requiredChildDistributions.length == children.length)
|
||||
assert(requiredChildOrderings.length == children.length)
|
||||
|
||||
// Ensure that the operator's children satisfy their output distribution requirements.
|
||||
children = children.zip(requiredChildDistributions).map {
|
||||
case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>
|
||||
child
|
||||
case (child, BroadcastDistribution(mode)) =>
|
||||
BroadcastExchangeExec(mode, child)
|
||||
case (child, distribution) =>
|
||||
val numPartitions = distribution.requiredNumPartitions
|
||||
.getOrElse(conf.numShufflePartitions)
|
||||
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
|
||||
}
|
||||
|
||||
// Get the indexes of children which have specified distribution requirements and need to have
|
||||
// same number of partitions.
|
||||
val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
|
||||
case (UnspecifiedDistribution, _) => false
|
||||
case (_: BroadcastDistribution, _) => false
|
||||
case _ => true
|
||||
}.map(_._2)
|
||||
|
||||
val childrenNumPartitions =
|
||||
childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet
|
||||
|
||||
if (childrenNumPartitions.size > 1) {
|
||||
// Get the number of partitions which is explicitly required by the distributions.
|
||||
val requiredNumPartitions = {
|
||||
val numPartitionsSet = childrenIndexes.flatMap {
|
||||
index => requiredChildDistributions(index).requiredNumPartitions
|
||||
}.toSet
|
||||
assert(numPartitionsSet.size <= 1,
|
||||
s"$operator have incompatible requirements of the number of partitions for its children")
|
||||
numPartitionsSet.headOption
|
||||
}
|
||||
|
||||
// If there are non-shuffle children that satisfy the required distribution, we have
|
||||
// some tradeoffs when picking the expected number of shuffle partitions:
|
||||
// 1. We should avoid shuffling these children.
|
||||
// 2. We should have a reasonable parallelism.
|
||||
val nonShuffleChildrenNumPartitions =
|
||||
childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec])
|
||||
.map(_.outputPartitioning.numPartitions)
|
||||
val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) {
|
||||
if (nonShuffleChildrenNumPartitions.length == childrenIndexes.length) {
|
||||
// Here we pick the max number of partitions among these non-shuffle children.
|
||||
nonShuffleChildrenNumPartitions.max
|
||||
} else {
|
||||
// Here we pick the max number of partitions among these non-shuffle children as the
|
||||
// expected number of shuffle partitions. However, if it's smaller than
|
||||
// `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the
|
||||
// expected number of shuffle partitions.
|
||||
math.max(nonShuffleChildrenNumPartitions.max, conf.defaultNumShufflePartitions)
|
||||
}
|
||||
} else {
|
||||
childrenNumPartitions.max
|
||||
}
|
||||
|
||||
val targetNumPartitions = requiredNumPartitions.getOrElse(expectedChildrenNumPartitions)
|
||||
|
||||
children = children.zip(requiredChildDistributions).zipWithIndex.map {
|
||||
case ((child, distribution), index) if childrenIndexes.contains(index) =>
|
||||
if (child.outputPartitioning.numPartitions == targetNumPartitions) {
|
||||
child
|
||||
} else {
|
||||
val defaultPartitioning = distribution.createPartitioning(targetNumPartitions)
|
||||
child match {
|
||||
// If child is an exchange, we replace it with a new one having defaultPartitioning.
|
||||
case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c)
|
||||
case _ => ShuffleExchangeExec(defaultPartitioning, child)
|
||||
}
|
||||
}
|
||||
|
||||
case ((child, _), _) => child
|
||||
}
|
||||
}
|
||||
|
||||
// Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
|
||||
children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
|
||||
// If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort.
|
||||
if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) {
|
||||
child
|
||||
} else {
|
||||
SortExec(requiredOrdering, global = false, child = child)
|
||||
}
|
||||
}
|
||||
|
||||
operator.withNewChildren(children)
|
||||
}
|
||||
|
||||
def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
|
||||
case operator: SparkPlan =>
|
||||
ensureDistributionAndOrdering(operator)
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,241 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.sql
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.sql.catalyst.plans.physical.Distribution
|
||||
import org.apache.spark.sql.catalyst.rules.Rule
|
||||
import org.apache.spark.sql.execution.{SortExec, SparkPlan}
|
||||
import org.apache.spark.sql.execution.adaptive.QueryStageExec
|
||||
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
|
||||
import org.apache.spark.sql.execution.command.{ResetCommand, SetCommand}
|
||||
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
|
||||
import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec}
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
||||
import org.apache.kyuubi.sql.KyuubiSQLConf._
|
||||
|
||||
/**
|
||||
* Insert shuffle node before join if it doesn't exist to make `OptimizeSkewedJoin` works.
|
||||
*/
|
||||
object InsertShuffleNodeBeforeJoin extends Rule[SparkPlan] {
|
||||
|
||||
override def apply(plan: SparkPlan): SparkPlan = {
|
||||
// this rule has no meaning without AQE
|
||||
if (!conf.getConf(FORCE_SHUFFLE_BEFORE_JOIN) ||
|
||||
!conf.getConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED)) {
|
||||
return plan
|
||||
}
|
||||
|
||||
val newPlan = insertShuffleBeforeJoin(plan)
|
||||
if (plan.fastEquals(newPlan)) {
|
||||
plan
|
||||
} else {
|
||||
// make sure the output partitioning and ordering will not be broken.
|
||||
KyuubiEnsureRequirements.apply(newPlan)
|
||||
}
|
||||
}
|
||||
|
||||
private def insertShuffleBeforeJoin(plan: SparkPlan): SparkPlan = plan transformUp {
|
||||
case smj @ SortMergeJoinExec(_, _, _, _, l, r, _) =>
|
||||
smj.withNewChildren(checkAndInsertShuffle(smj.requiredChildDistribution.head, l) ::
|
||||
checkAndInsertShuffle(smj.requiredChildDistribution(1), r) :: Nil)
|
||||
|
||||
case shj: ShuffledHashJoinExec =>
|
||||
if (!shj.left.isInstanceOf[Exchange] && !shj.right.isInstanceOf[Exchange]) {
|
||||
shj.withNewChildren(withShuffleExec(shj.requiredChildDistribution.head, shj.left) ::
|
||||
withShuffleExec(shj.requiredChildDistribution(1), shj.right) :: Nil)
|
||||
} else if (!shj.left.isInstanceOf[Exchange]) {
|
||||
shj.withNewChildren(
|
||||
withShuffleExec(shj.requiredChildDistribution.head, shj.left) :: shj.right :: Nil)
|
||||
} else if (!shj.right.isInstanceOf[Exchange]) {
|
||||
shj.withNewChildren(
|
||||
shj.left :: withShuffleExec(shj.requiredChildDistribution(1), shj.right) :: Nil)
|
||||
} else {
|
||||
shj
|
||||
}
|
||||
}
|
||||
|
||||
private def checkAndInsertShuffle(
|
||||
distribution: Distribution, child: SparkPlan): SparkPlan = child match {
|
||||
case SortExec(_, _, _: Exchange, _) =>
|
||||
child
|
||||
case SortExec(_, _, _: QueryStageExec, _) =>
|
||||
child
|
||||
case sort @ SortExec(_, _, agg: BaseAggregateExec, _) =>
|
||||
sort.withNewChildren(withShuffleExec(distribution, agg) :: Nil)
|
||||
case _ =>
|
||||
withShuffleExec(distribution, child)
|
||||
}
|
||||
|
||||
private def withShuffleExec(distribution: Distribution, child: SparkPlan): SparkPlan = {
|
||||
val numPartitions = distribution.requiredNumPartitions
|
||||
.getOrElse(conf.numShufflePartitions)
|
||||
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This rule split stage into two parts:
|
||||
* 1. previous stage
|
||||
* 2. final stage
|
||||
* For final stage, we can inject extra config. It's useful if we use repartition to optimize
|
||||
* small files that needs bigger shuffle partition size than previous.
|
||||
*
|
||||
* Let's say we have a query with 3 stages, then the logical machine like:
|
||||
*
|
||||
* Set/Reset Command -> cleanup previousStage config if user set the spark config.
|
||||
* Query -> AQE -> stage1 -> preparation (use previousStage to overwrite spark config)
|
||||
* -> AQE -> stage2 -> preparation (use spark config)
|
||||
* -> AQE -> stage3 -> preparation (use finalStage config to overwrite spark config,
|
||||
* store spark config to previousStage.)
|
||||
*
|
||||
* An example of the new finalStage config:
|
||||
* `spark.sql.adaptive.advisoryPartitionSizeInBytes` ->
|
||||
* `spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`
|
||||
*/
|
||||
case class FinalStageConfigIsolation(session: SparkSession) extends Rule[SparkPlan] {
|
||||
import FinalStageConfigIsolation._
|
||||
|
||||
override def apply(plan: SparkPlan): SparkPlan = {
|
||||
// this rule has no meaning without AQE
|
||||
if (!conf.getConf(FINAL_STAGE_CONFIG_ISOLATION) ||
|
||||
!conf.getConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED)) {
|
||||
return plan
|
||||
}
|
||||
|
||||
if (isFinalStage(plan)) {
|
||||
// set config for final stage
|
||||
session.conf.getAll.filter(_._1.startsWith(FINAL_STAGE_CONFIG_PREFIX)).foreach {
|
||||
case (k, v) =>
|
||||
val sparkConfigKey = s"spark.sql.${k.substring(FINAL_STAGE_CONFIG_PREFIX.length)}"
|
||||
val previousStageConfigKey =
|
||||
s"$PREVIOUS_STAGE_CONFIG_PREFIX${k.substring(FINAL_STAGE_CONFIG_PREFIX.length)}"
|
||||
// store the previous config only if we have not stored, to avoid some query only
|
||||
// have one stage that will overwrite real config.
|
||||
if (!session.sessionState.conf.contains(previousStageConfigKey)) {
|
||||
val originalValue = if (session.conf.getOption(sparkConfigKey).isDefined) {
|
||||
session.sessionState.conf.getConfString(sparkConfigKey)
|
||||
} else {
|
||||
// the default value of config is None, so we need to use a internal tag
|
||||
INTERNAL_UNSET_CONFIG_TAG
|
||||
}
|
||||
logInfo(s"Store config: $sparkConfigKey to previousStage, " +
|
||||
s"original value: $originalValue ")
|
||||
session.sessionState.conf.setConfString(previousStageConfigKey, originalValue)
|
||||
}
|
||||
logInfo(s"For final stage: set $sparkConfigKey = $v.")
|
||||
session.conf.set(sparkConfigKey, v)
|
||||
}
|
||||
} else {
|
||||
// reset config for previous stage
|
||||
session.conf.getAll.filter(_._1.startsWith(PREVIOUS_STAGE_CONFIG_PREFIX)).foreach {
|
||||
case (k, v) =>
|
||||
val sparkConfigKey = s"spark.sql.${k.substring(PREVIOUS_STAGE_CONFIG_PREFIX.length)}"
|
||||
logInfo(s"For previous stage: set $sparkConfigKey = $v.")
|
||||
if (v == INTERNAL_UNSET_CONFIG_TAG) {
|
||||
session.conf.unset(sparkConfigKey)
|
||||
} else {
|
||||
session.conf.set(sparkConfigKey, v)
|
||||
}
|
||||
// unset config so that we do not need to reset configs for every previous stage
|
||||
session.conf.unset(k)
|
||||
}
|
||||
}
|
||||
|
||||
plan
|
||||
}
|
||||
|
||||
/**
|
||||
* Currently formula depend on AQE in Spark 3.1.1, not sure it can work in future.
|
||||
*/
|
||||
private def isFinalStage(plan: SparkPlan): Boolean = {
|
||||
var shuffleNum = 0
|
||||
var broadcastNum = 0
|
||||
var queryStageNum = 0
|
||||
|
||||
def collectNumber(p: SparkPlan): SparkPlan = {
|
||||
p transform {
|
||||
case shuffle: ShuffleExchangeLike =>
|
||||
shuffleNum += 1
|
||||
shuffle
|
||||
|
||||
case broadcast: BroadcastExchangeLike =>
|
||||
broadcastNum += 1
|
||||
broadcast
|
||||
|
||||
// query stage is leaf node so we need to transform it manually
|
||||
case queryStage: QueryStageExec =>
|
||||
queryStageNum += 1
|
||||
collectNumber(queryStage.plan)
|
||||
queryStage
|
||||
}
|
||||
}
|
||||
collectNumber(plan)
|
||||
|
||||
if (shuffleNum == 0) {
|
||||
// we don not care about broadcast stage here since it won't change partition number.
|
||||
true
|
||||
} else if (shuffleNum + broadcastNum == queryStageNum) {
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
object FinalStageConfigIsolation {
|
||||
final val SQL_PREFIX = "spark.sql."
|
||||
final val FINAL_STAGE_CONFIG_PREFIX = "spark.sql.finalStage."
|
||||
final val PREVIOUS_STAGE_CONFIG_PREFIX = "spark.sql.previousStage."
|
||||
final val INTERNAL_UNSET_CONFIG_TAG = "__INTERNAL_UNSET_CONFIG_TAG__"
|
||||
|
||||
def getPreviousStageConfigKey(configKey: String): Option[String] = {
|
||||
if (configKey.startsWith(SQL_PREFIX)) {
|
||||
Some(s"$PREVIOUS_STAGE_CONFIG_PREFIX${configKey.substring(SQL_PREFIX.length)}")
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case class FinalStageConfigIsolationCleanRule(session: SparkSession) extends Rule[LogicalPlan] {
|
||||
import FinalStageConfigIsolation._
|
||||
|
||||
override def apply(plan: LogicalPlan): LogicalPlan = plan match {
|
||||
case set @ SetCommand(Some((k, Some(_)))) if k.startsWith(SQL_PREFIX) =>
|
||||
checkAndUnsetPreviousStageConfig(k)
|
||||
set
|
||||
|
||||
case reset @ ResetCommand(Some(k)) if k.startsWith(SQL_PREFIX) =>
|
||||
checkAndUnsetPreviousStageConfig(k)
|
||||
reset
|
||||
|
||||
case other => other
|
||||
}
|
||||
|
||||
private def checkAndUnsetPreviousStageConfig(configKey: String): Unit = {
|
||||
getPreviousStageConfigKey(configKey).foreach { previousStageConfigKey =>
|
||||
if (session.sessionState.conf.contains(previousStageConfigKey)) {
|
||||
logInfo(s"For previous stage: unset $previousStageConfigKey")
|
||||
session.conf.unset(previousStageConfigKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.sql
|
||||
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.internal.SQLConf._
|
||||
|
||||
object KyuubiSQLConf {
|
||||
|
||||
val INSERT_REPARTITION_BEFORE_WRITE =
|
||||
buildConf("spark.sql.optimizer.insertRepartitionBeforeWrite.enabled")
|
||||
.doc("Add repartition node at the top of plan. A approach of merging small files.")
|
||||
.version("1.2.0")
|
||||
.booleanConf
|
||||
.createWithDefault(true)
|
||||
|
||||
val INSERT_REPARTITION_NUM =
|
||||
buildConf("spark.sql.optimizer.insertRepartitionNum")
|
||||
.doc(s"The partition number if ${INSERT_REPARTITION_BEFORE_WRITE.key} is enabled. " +
|
||||
s"If AQE is disabled, the default value is ${SQLConf.SHUFFLE_PARTITIONS}. " +
|
||||
s"If AQE is enabled, the default value is none that means depend on AQE.")
|
||||
.version("1.2.0")
|
||||
.intConf
|
||||
.createOptional
|
||||
|
||||
val DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM =
|
||||
buildConf("spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum")
|
||||
.doc(s"The partition number of each dynamic partition if " +
|
||||
s"${INSERT_REPARTITION_BEFORE_WRITE.key} is enabled. " +
|
||||
s"We will repartition by dynamic partition columns to reduce the small file but that " +
|
||||
s"can cause data skew. This config is to extend the partition of dynamic " +
|
||||
s"partition column to avoid skew but may generate some small files.")
|
||||
.version("1.2.0")
|
||||
.intConf
|
||||
.createWithDefault(100)
|
||||
|
||||
val FORCE_SHUFFLE_BEFORE_JOIN =
|
||||
buildConf("spark.sql.optimizer.forceShuffleBeforeJoin.enabled")
|
||||
.doc("Ensure shuffle node exists before shuffled join (shj and smj) to make AQE " +
|
||||
"`OptimizeSkewedJoin` works (extra shuffle, multi table join).")
|
||||
.version("1.2.0")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val FINAL_STAGE_CONFIG_ISOLATION =
|
||||
buildConf("spark.sql.optimizer.finalStageConfigIsolation.enabled")
|
||||
.doc("If true, the final stage support use different config with previous stage. The final " +
|
||||
"stage config key prefix should be `spark.sql.finalStage.`." +
|
||||
"For example, the raw spark config: `spark.sql.adaptive.advisoryPartitionSizeInBytes`, " +
|
||||
"then the final stage config should be: " +
|
||||
"`spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`.")
|
||||
.version("1.2.0")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
}
|
||||
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.sql
|
||||
|
||||
import org.apache.spark.sql.SparkSessionExtensions
|
||||
|
||||
// scalastyle:off line.size.limit
|
||||
/**
|
||||
* Depend on Spark SQL Extension framework, we can use this extension follow steps
|
||||
* 1. move this jar into $SPARK_HOME/jars
|
||||
* 2. add config into `spark-defaults.conf`: `spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension`
|
||||
*/
|
||||
// scalastyle:on line.size.limit
|
||||
class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
|
||||
override def apply(extensions: SparkSessionExtensions): Unit = {
|
||||
extensions.injectPostHocResolutionRule(RepartitionBeforeWrite)
|
||||
extensions.injectPostHocResolutionRule(RepartitionBeforeWriteHive)
|
||||
extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
|
||||
extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
|
||||
extensions.injectQueryStagePrepRule(FinalStageConfigIsolation(_))
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,364 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql
|
||||
|
||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Multiply}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
|
||||
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, CustomShuffleReaderExec}
|
||||
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike}
|
||||
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
||||
import org.apache.spark.sql.test.SQLTestData.TestData
|
||||
import org.apache.spark.sql.test.SQLTestUtils
|
||||
|
||||
import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf}
|
||||
|
||||
class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSparkPlanHelper {
|
||||
|
||||
var _spark: SparkSession = _
|
||||
override def spark: SparkSession = _spark
|
||||
|
||||
protected override def beforeAll(): Unit = {
|
||||
_spark = SparkSession.builder()
|
||||
.master("local[1]")
|
||||
.config(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
|
||||
"org.apache.kyuubi.sql.KyuubiSparkSQLExtension")
|
||||
.config(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
|
||||
.config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")
|
||||
.config("spark.ui.enabled", "false")
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
setupData()
|
||||
super.beforeAll()
|
||||
}
|
||||
|
||||
protected override def afterAll(): Unit = {
|
||||
super.afterAll()
|
||||
cleanupData()
|
||||
if (_spark != null) {
|
||||
_spark.stop()
|
||||
}
|
||||
}
|
||||
|
||||
private def setupData(): Unit = {
|
||||
val self = _spark
|
||||
import self.implicits._
|
||||
spark.sparkContext.parallelize(
|
||||
(1 to 100).map(i => TestData(i, i.toString)), 10)
|
||||
.toDF("c1", "c2").createOrReplaceTempView("t1")
|
||||
spark.sparkContext.parallelize(
|
||||
(1 to 10).map(i => TestData(i, i.toString)), 5)
|
||||
.toDF("c1", "c2").createOrReplaceTempView("t2")
|
||||
spark.sparkContext.parallelize(
|
||||
(1 to 50).map(i => TestData(i, i.toString)), 2)
|
||||
.toDF("c1", "c2").createOrReplaceTempView("t3")
|
||||
}
|
||||
|
||||
private def cleanupData(): Unit = {
|
||||
spark.sql("DROP VIEW IF EXISTS t1")
|
||||
spark.sql("DROP VIEW IF EXISTS t2")
|
||||
spark.sql("DROP VIEW IF EXISTS t3")
|
||||
}
|
||||
|
||||
test("check repartition exists") {
|
||||
def check(df: DataFrame): Unit = {
|
||||
assert(
|
||||
df.queryExecution.analyzed.collect {
|
||||
case r: RepartitionByExpression =>
|
||||
assert(r.optNumPartitions ===
|
||||
spark.sessionState.conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))
|
||||
r
|
||||
}.size == 1
|
||||
)
|
||||
}
|
||||
|
||||
// It's better to set config explicitly in case of we change the default value.
|
||||
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") {
|
||||
Seq("USING PARQUET", "").foreach { storage =>
|
||||
withTable("tmp1") {
|
||||
sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)")
|
||||
check(sql("INSERT INTO TABLE tmp1 PARTITION(c2='a') " +
|
||||
"SELECT * FROM VALUES(1),(2) AS t(c1)"))
|
||||
}
|
||||
|
||||
withTable("tmp1") {
|
||||
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
|
||||
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)"))
|
||||
check(sql("INSERT INTO TABLE tmp1 " +
|
||||
"SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1"))
|
||||
}
|
||||
|
||||
withTable("tmp1") {
|
||||
sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) AS t(c1)")
|
||||
}
|
||||
|
||||
withTable("tmp1") {
|
||||
sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
|
||||
s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("check repartition does not exists") {
|
||||
def check(df: DataFrame): Unit = {
|
||||
assert(
|
||||
df.queryExecution.analyzed.collect {
|
||||
case r: RepartitionByExpression => r
|
||||
}.isEmpty
|
||||
)
|
||||
}
|
||||
|
||||
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") {
|
||||
// test no write command
|
||||
check(sql("SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
|
||||
check(sql("SELECT count(*) FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
|
||||
|
||||
// test not supported plan
|
||||
withTable("tmp1") {
|
||||
sql(s"CREATE TABLE tmp1 (c1 int) PARTITIONED BY (c2 string)")
|
||||
check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
|
||||
"SELECT /*+ repartition(10) */ * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
|
||||
check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
|
||||
"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) ORDER BY c1"))
|
||||
check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
|
||||
"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) LIMIT 10"))
|
||||
}
|
||||
}
|
||||
|
||||
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "false") {
|
||||
Seq("USING PARQUET", "").foreach { storage =>
|
||||
withTable("tmp1") {
|
||||
sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)")
|
||||
check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
|
||||
"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
|
||||
}
|
||||
|
||||
withTable("tmp1") {
|
||||
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
|
||||
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)"))
|
||||
}
|
||||
|
||||
withTable("tmp1") {
|
||||
sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) AS t(c1)")
|
||||
}
|
||||
|
||||
withTable("tmp1") {
|
||||
sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
|
||||
s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("test dynamic partition write") {
|
||||
def checkRepartitionExpression(df: DataFrame): Unit = {
|
||||
assert(df.queryExecution.analyzed.collect {
|
||||
case r: RepartitionByExpression if r.partitionExpressions.size == 2 =>
|
||||
assert(r.partitionExpressions.head.asInstanceOf[Attribute].name === "c2")
|
||||
assert(r.partitionExpressions(1).asInstanceOf[Cast].child.asInstanceOf[Multiply]
|
||||
.left.asInstanceOf[Attribute].name.startsWith("_nondeterministic"))
|
||||
r
|
||||
}.size == 1)
|
||||
}
|
||||
|
||||
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true",
|
||||
KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM.key -> "2") {
|
||||
Seq("USING PARQUET", "").foreach { storage =>
|
||||
withTable("tmp1") {
|
||||
sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)")
|
||||
checkRepartitionExpression(sql("INSERT INTO TABLE tmp1 SELECT 1 as c1, 'a' as c2 "))
|
||||
}
|
||||
|
||||
withTable("tmp1") {
|
||||
checkRepartitionExpression(
|
||||
sql("CREATE TABLE tmp1 PARTITIONED BY(C2) SELECT 1 as c1, 'a' as c2 "))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("force shuffle before join") {
|
||||
def checkShuffleNodeNum(sqlString: String, num: Int): Unit = {
|
||||
var expectedResult: Seq[Row] = Seq.empty
|
||||
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
|
||||
expectedResult = sql(sqlString).collect()
|
||||
}
|
||||
val df = sql(sqlString)
|
||||
checkAnswer(df, expectedResult)
|
||||
assert(
|
||||
collect(df.queryExecution.executedPlan) {
|
||||
case shuffle: ShuffleExchangeLike
|
||||
if shuffle.shuffleOrigin == ENSURE_REQUIREMENTS => shuffle
|
||||
}.size == num)
|
||||
}
|
||||
|
||||
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
|
||||
KyuubiSQLConf.FORCE_SHUFFLE_BEFORE_JOIN.key -> "true") {
|
||||
Seq("SHUFFLE_HASH", "MERGE").foreach { joinHint =>
|
||||
// positive case
|
||||
checkShuffleNodeNum(
|
||||
s"""
|
||||
|SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from t1
|
||||
| JOIN t2 ON t1.c1 = t2.c1
|
||||
| JOIN t3 ON t1.c1 = t3.c1
|
||||
| """.stripMargin, 4)
|
||||
|
||||
// negative case
|
||||
checkShuffleNodeNum(
|
||||
s"""
|
||||
|SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from t1
|
||||
| JOIN t2 ON t1.c1 = t2.c1
|
||||
| JOIN t3 ON t1.c2 = t3.c2
|
||||
| """.stripMargin, 4)
|
||||
}
|
||||
|
||||
checkShuffleNodeNum(
|
||||
"""
|
||||
|SELECT t1.c1, t2.c1, t3.c2 from t1
|
||||
| JOIN t2 ON t1.c1 = t2.c1
|
||||
| JOIN (
|
||||
| SELECT c2, count(*) FROM t1 GROUP BY c2
|
||||
| ) t3 ON t1.c1 = t3.c2
|
||||
| """.stripMargin, 5)
|
||||
|
||||
checkShuffleNodeNum(
|
||||
"""
|
||||
|SELECT t1.c1, t2.c1, t3.c1 from t1
|
||||
| JOIN t2 ON t1.c1 = t2.c1
|
||||
| JOIN (
|
||||
| SELECT c1, count(*) FROM t1 GROUP BY c1
|
||||
| ) t3 ON t1.c1 = t3.c1
|
||||
| """.stripMargin, 5)
|
||||
}
|
||||
}
|
||||
|
||||
test("final stage config set reset check") {
|
||||
withSQLConf(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
|
||||
"spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum" -> "1",
|
||||
"spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" -> "100") {
|
||||
// use loop to double check final stage config doesn't affect the sql query each other
|
||||
(1 to 3).foreach { _ =>
|
||||
sql("SELECT COUNT(*) FROM VALUES(1) as t(c)").collect()
|
||||
assert(spark.sessionState.conf.getConfString(
|
||||
"spark.sql.previousStage.adaptive.coalescePartitions.minPartitionNum") ===
|
||||
FinalStageConfigIsolation.INTERNAL_UNSET_CONFIG_TAG)
|
||||
assert(spark.sessionState.conf.getConfString(
|
||||
"spark.sql.adaptive.coalescePartitions.minPartitionNum") ===
|
||||
"1")
|
||||
assert(spark.sessionState.conf.getConfString(
|
||||
"spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum") ===
|
||||
"1")
|
||||
|
||||
// 64MB
|
||||
assert(spark.sessionState.conf.getConfString(
|
||||
"spark.sql.previousStage.adaptive.advisoryPartitionSizeInBytes") ===
|
||||
"67108864b")
|
||||
assert(spark.sessionState.conf.getConfString(
|
||||
"spark.sql.adaptive.advisoryPartitionSizeInBytes") ===
|
||||
"100")
|
||||
assert(spark.sessionState.conf.getConfString(
|
||||
"spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes") ===
|
||||
"100")
|
||||
}
|
||||
|
||||
sql("SET spark.sql.adaptive.advisoryPartitionSizeInBytes=1")
|
||||
assert(spark.sessionState.conf.getConfString(
|
||||
"spark.sql.adaptive.advisoryPartitionSizeInBytes") ===
|
||||
"1")
|
||||
assert(!spark.sessionState.conf.contains(
|
||||
"spark.sql.previousStage.adaptive.advisoryPartitionSizeInBytes"))
|
||||
|
||||
sql("SET a=1")
|
||||
assert(spark.sessionState.conf.getConfString("a") === "1")
|
||||
|
||||
sql("RESET spark.sql.adaptive.coalescePartitions.minPartitionNum")
|
||||
assert(!spark.sessionState.conf.contains(
|
||||
"spark.sql.adaptive.coalescePartitions.minPartitionNum"))
|
||||
assert(!spark.sessionState.conf.contains(
|
||||
"spark.sql.previousStage.adaptive.coalescePartitions.minPartitionNum"))
|
||||
|
||||
sql("RESET a")
|
||||
assert(!spark.sessionState.conf.contains("a"))
|
||||
}
|
||||
}
|
||||
|
||||
test("final stage config isolation") {
|
||||
def checkPartitionNum(
|
||||
sqlString: String, previousPartitionNum: Int, finalPartitionNum: Int): Unit = {
|
||||
val df = sql(sqlString)
|
||||
df.collect()
|
||||
val shuffleReaders = collect(df.queryExecution.executedPlan) {
|
||||
case customShuffleReader: CustomShuffleReaderExec => customShuffleReader
|
||||
}
|
||||
assert(shuffleReaders.nonEmpty)
|
||||
shuffleReaders.tail.foreach { readers =>
|
||||
assert(readers.partitionSpecs.length === previousPartitionNum)
|
||||
}
|
||||
assert(shuffleReaders.head.partitionSpecs.length === finalPartitionNum)
|
||||
assert(df.rdd.partitions.length === finalPartitionNum)
|
||||
}
|
||||
|
||||
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
|
||||
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
|
||||
SQLConf.SHUFFLE_PARTITIONS.key -> "3",
|
||||
KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
|
||||
"spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "1",
|
||||
"spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" -> "10000000") {
|
||||
|
||||
// use loop to double check final stage config doesn't affect the sql query each other
|
||||
(1 to 3).foreach { _ =>
|
||||
checkPartitionNum(
|
||||
"SELECT c1, count(*) FROM t1 GROUP BY c1",
|
||||
1,
|
||||
1)
|
||||
|
||||
checkPartitionNum(
|
||||
"SELECT c2, count(*) FROM (SELECT c1, count(*) as c2 FROM t1 GROUP BY c1) GROUP BY c2",
|
||||
3,
|
||||
1)
|
||||
|
||||
checkPartitionNum(
|
||||
"SELECT t1.c1, count(*) FROM t1 JOIN t2 ON t1.c2 = t2.c2 GROUP BY t1.c1",
|
||||
3,
|
||||
1)
|
||||
|
||||
checkPartitionNum(
|
||||
"""
|
||||
| SELECT /*+ REPARTITION */
|
||||
| t1.c1, count(*) FROM t1
|
||||
| JOIN t2 ON t1.c2 = t2.c2
|
||||
| JOIN t3 ON t1.c1 = t3.c1
|
||||
| GROUP BY t1.c1
|
||||
|""".stripMargin,
|
||||
3,
|
||||
1)
|
||||
|
||||
// one shuffle reader
|
||||
checkPartitionNum(
|
||||
"""
|
||||
| SELECT /*+ BROADCAST(t1) */
|
||||
| t1.c1, t2.c2 FROM t1
|
||||
| JOIN t2 ON t1.c2 = t2.c2
|
||||
| DISTRIBUTE BY c1
|
||||
|""".stripMargin,
|
||||
1,
|
||||
1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -101,4 +101,16 @@
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>kyuubi-extension-spark_3.1</id>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.kyuubi</groupId>
|
||||
<artifactId>kyuubi-extension-spark_3.1</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
||||
|
||||
10
pom.xml
10
pom.xml
@ -1584,5 +1584,15 @@
|
||||
<spark.archive.download.skip>true</spark.archive.download.skip>
|
||||
</properties>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>kyuubi-extension-spark_3.1</id>
|
||||
<properties>
|
||||
<spark.version>3.1.1</spark.version>
|
||||
</properties>
|
||||
<modules>
|
||||
<module>dev/kyuubi-extension-spark_3.1</module>
|
||||
</modules>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user