[KYUUBI #1443] MaxPartitionStrategy support file source based table.

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

Fix [comment](https://github.com/apache/incubator-kyuubi/pull/1086#discussion_r707103617)

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1443 from cfmcgrady/max-partition-strategy.

Closes #1443

e071ebc2 [Fu Chen] typo
f7d4ea16 [Fu Chen] refactor
97a36016 [Fu Chen] truncate RootPaths
c3fe93aa [Fu Chen] remove dpp suite
9075e5ba [Fu Chen] refactor
5a71f459 [Fu Chen] trigger GitHub actions
17df25d2 [Fu Chen] fix style
4dfe6a7e [Fu Chen] fix style
0b2df063 [Fu Chen] update docs
305c6e88 [Fu Chen] fix style
33b9b3d5 [Fu Chen] fix style
885e8ed7 [Fu Chen] data source support

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
Fu Chen 2021-11-25 17:34:17 +08:00 committed by ulysses-you
parent c9273eb381
commit 78c9cf3756
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
8 changed files with 291 additions and 117 deletions

View File

@ -20,7 +20,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification
import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MarkAggregateOrderRule, MaxHivePartitionStrategy}
import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MarkAggregateOrderRule, MaxPartitionStrategy}
// scalastyle:off line.size.limit
/**
@ -40,6 +40,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
extensions.injectPostHocResolutionRule(RepartitionBeforeWritingHive)
extensions.injectPostHocResolutionRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxHivePartitionStrategy)
extensions.injectPlannerStrategy(MaxPartitionStrategy)
}
}

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.sql.watchdog
import org.apache.kyuubi.sql.KyuubiSQLExtensionException
final class MaxHivePartitionExceedException(
final class MaxPartitionExceedException(
private val reason: String = "",
private val cause: Throwable = None.orNull)
extends KyuubiSQLExtensionException(reason, cause)

View File

@ -1,81 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.sql.watchdog
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.kyuubi.sql.KyuubiSQLConf
/**
* Add maxHivePartitions Strategy to avoid scan excessive hive partitions on partitioned table
* 1 Check if scan exceed maxHivePartition
* 2 Check if Using partitionFilter on partitioned table
* This Strategy Add Planner Strategy after LogicalOptimizer
*/
case class MaxHivePartitionStrategy(session: SparkSession)
extends Strategy with SQLConfHelper {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_HIVEPARTITION) match {
case Some(maxHivePartition) => plan match {
case ScanOperation(_, _, relation: HiveTableRelation) if relation.isPartitioned =>
relation.prunedPartitions match {
case Some(prunedPartitions) =>
if (prunedPartitions.size > maxHivePartition) {
throw new MaxHivePartitionExceedException(
s"""
|SQL job scan hive partition: ${prunedPartitions.size}
|exceed restrict of hive scan maxPartition $maxHivePartition
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${relation.tableMeta.qualifiedName}
|Owner: ${relation.tableMeta.owner}
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(" -> ")}
|""".stripMargin)
} else {
Nil
}
case _ =>
val totalPartitions = session
.sessionState.catalog.externalCatalog.listPartitionNames(
relation.tableMeta.database,
relation.tableMeta.identifier.table)
if (totalPartitions.size > maxHivePartition) {
throw new MaxHivePartitionExceedException(
s"""
|Your SQL job scan a whole huge table without any partition filter,
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${relation.tableMeta.qualifiedName}
|Owner: ${relation.tableMeta.owner}
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(" -> ")}
|""".stripMargin)
} else {
Nil
}
}
case _ => Nil
}
case _ => Nil
}
}
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.sql.watchdog
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.sql.KyuubiSQLConf
/**
* Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table
* 1 Check if scan exceed maxPartition
* 2 Check if Using partitionFilter on partitioned table
* This Strategy Add Planner Strategy after LogicalOptimizer
*/
case class MaxPartitionStrategy(session: SparkSession)
extends Strategy
with SQLConfHelper
with PruneFileSourcePartitionHelper {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
val maxScanPartitionsOpt = conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS)
if (maxScanPartitionsOpt.isDefined) {
checkRelationMaxPartitions(plan, maxScanPartitionsOpt.get)
}
Nil
}
private def checkRelationMaxPartitions(
plan: LogicalPlan,
maxScanPartitions: Int): Unit = {
plan match {
case ScanOperation(_, _, relation: HiveTableRelation) if relation.isPartitioned =>
relation.prunedPartitions match {
case Some(prunedPartitions) =>
if (prunedPartitions.size > maxScanPartitions) {
throw new MaxPartitionExceedException(
s"""
|SQL job scan hive partition: ${prunedPartitions.size}
|exceed restrict of hive scan maxPartition $maxScanPartitions
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${relation.tableMeta.qualifiedName}
|Owner: ${relation.tableMeta.owner}
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
|""".stripMargin)
}
case _ =>
val totalPartitions = session
.sessionState.catalog.externalCatalog.listPartitionNames(
relation.tableMeta.database,
relation.tableMeta.identifier.table)
if (totalPartitions.size > maxScanPartitions) {
throw new MaxPartitionExceedException(
s"""
|Your SQL job scan a whole huge table without any partition filter,
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${relation.tableMeta.qualifiedName}
|Owner: ${relation.tableMeta.owner}
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
|""".stripMargin)
}
}
case ScanOperation(
_,
filters,
relation @ LogicalRelation(
fsRelation @ HadoopFsRelation(
fileIndex: InMemoryFileIndex,
partitionSchema,
_,
_,
_,
_),
_,
_,
_)) if fsRelation.partitionSchemaOption.isDefined =>
val (partitionKeyFilters, dataFilter) =
getPartitionKeyFiltersAndDataFilters(
fsRelation.sparkSession,
relation,
partitionSchema,
filters,
relation.output)
val prunedPartitionSize = fileIndex.listFiles(
partitionKeyFilters.toSeq,
dataFilter)
.size
if (prunedPartitionSize > maxScanPartitions) {
throw maxPartitionExceedError(
prunedPartitionSize,
maxScanPartitions,
relation.catalogTable,
fileIndex.rootPaths,
fsRelation.partitionSchema)
}
case ScanOperation(
_,
filters,
logicalRelation @ LogicalRelation(
fsRelation @ HadoopFsRelation(
catalogFileIndex: CatalogFileIndex,
partitionSchema,
_,
_,
_,
_),
_,
_,
_)) if fsRelation.partitionSchemaOption.isDefined =>
val (partitionKeyFilters, _) =
getPartitionKeyFiltersAndDataFilters(
fsRelation.sparkSession,
logicalRelation,
partitionSchema,
filters,
logicalRelation.output)
val prunedPartitionSize =
catalogFileIndex.filterPartitions(
partitionKeyFilters.toSeq)
.partitionSpec()
.partitions
.size
if (prunedPartitionSize > maxScanPartitions) {
throw maxPartitionExceedError(
prunedPartitionSize,
maxScanPartitions,
logicalRelation.catalogTable,
catalogFileIndex.rootPaths,
fsRelation.partitionSchema)
}
case _ =>
}
}
def maxPartitionExceedError(
prunedPartitionSize: Int,
maxPartitionSize: Int,
tableMeta: Option[CatalogTable],
rootPaths: Seq[Path],
partitionSchema: StructType): Throwable = {
val truncatedPaths =
if (rootPaths.length > 5) {
rootPaths.slice(0, 5).mkString(",") + """... """ + (rootPaths.length - 5) + " more paths"
} else {
rootPaths.mkString(",")
}
new MaxPartitionExceedException(
s"""
|SQL job scan data source partition: $prunedPartitionSize
|exceed restrict of data source scan maxPartition $maxPartitionSize
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${tableMeta.map(_.qualifiedName).getOrElse("")}
|Owner: ${tableMeta.map(_.owner).getOrElse("")}
|RootPaths: $truncatedPaths
|Partition Structure: ${partitionSchema.map(_.name).mkString(", ")}
|""".stripMargin)
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExpressionSet, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.StructType
trait PruneFileSourcePartitionHelper extends PredicateHelper {
def getPartitionKeyFiltersAndDataFilters(
sparkSession: SparkSession,
relation: LeafNode,
partitionSchema: StructType,
filters: Seq[Expression],
output: Seq[AttributeReference]): (ExpressionSet, Seq[Expression]) = {
val normalizedFilters = DataSourceStrategy.normalizeExprs(
filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)),
output)
val partitionColumns =
relation.resolve(partitionSchema, sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val (partitionFilters, dataFilters) = normalizedFilters.partition(f =>
f.references.subsetOf(partitionSet))
val extraPartitionFilter =
dataFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionSet))
(ExpressionSet(partitionFilters ++ extraPartitionFilter), dataFilters)
}
}

View File

@ -20,7 +20,7 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
import org.apache.kyuubi.sql.KyuubiSQLConf
import org.apache.kyuubi.sql.watchdog.MaxHivePartitionExceedException
import org.apache.kyuubi.sql.watchdog.MaxPartitionExceedException
class WatchDogSuite extends KyuubiSparkSQLExtensionTest {
override protected def beforeAll(): Unit = {
@ -31,38 +31,60 @@ class WatchDogSuite extends KyuubiSparkSQLExtensionTest {
case class LimitAndExpected(limit: Int, expected: Int)
val limitAndExpecteds = List(LimitAndExpected(1, 1), LimitAndExpected(11, 10))
test("test watchdog with scan maxHivePartitions") {
withTable("test", "temp") {
sql(
s"""
|CREATE TABLE test(i int)
|PARTITIONED BY (p int)
|STORED AS textfile""".stripMargin)
spark.range(0, 10, 1).selectExpr("id as col")
.createOrReplaceTempView("temp")
private def checkMaxPartition: Unit = {
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS.key -> "100") {
checkAnswer(sql("SELECT count(distinct(p)) FROM test"), Row(10) :: Nil)
}
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS.key -> "5") {
sql("SELECT * FROM test where p=1").queryExecution.sparkPlan
for (part <- Range(0, 10)) {
sql(s"SELECT * FROM test WHERE p in (${Range(0, 5).toList.mkString(",")})")
.queryExecution.sparkPlan
intercept[MaxPartitionExceedException](
sql("SELECT * FROM test where p != 1").queryExecution.sparkPlan)
intercept[MaxPartitionExceedException](
sql("SELECT * FROM test").queryExecution.sparkPlan)
intercept[MaxPartitionExceedException](sql(
s"SELECT * FROM test WHERE p in (${Range(0, 6).toList.mkString(",")})")
.queryExecution.sparkPlan)
}
}
test("watchdog with scan maxPartitions -- hive") {
Seq("textfile", "parquet").foreach { format =>
withTable("test", "temp") {
sql(
s"""
|INSERT OVERWRITE TABLE test PARTITION (p='$part')
|select col from temp""".stripMargin)
|CREATE TABLE test(i int)
|PARTITIONED BY (p int)
|STORED AS $format""".stripMargin)
spark.range(0, 10, 1).selectExpr("id as col")
.createOrReplaceTempView("temp")
for (part <- Range(0, 10)) {
sql(
s"""
|INSERT OVERWRITE TABLE test PARTITION (p='$part')
|select col from temp""".stripMargin)
}
checkMaxPartition
}
}
}
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_HIVEPARTITION.key -> "5") {
sql("SELECT * FROM test where p=1").queryExecution.sparkPlan
sql(
s"SELECT * FROM test WHERE p in (${Range(0, 5).toList.mkString(",")})")
.queryExecution.sparkPlan
intercept[MaxHivePartitionExceedException](
sql("SELECT * FROM test").queryExecution.sparkPlan)
intercept[MaxHivePartitionExceedException](sql(
s"SELECT * FROM test WHERE p in (${Range(0, 6).toList.mkString(",")})")
.queryExecution.sparkPlan)
test("watchdog with scan maxPartitions -- data source") {
withTempDir { dir =>
withTempView("test") {
spark.range(10).selectExpr("id", "id as p")
.write
.partitionBy("p")
.mode("overwrite")
.save(dir.getCanonicalPath)
spark.read.load(dir.getCanonicalPath).createOrReplaceTempView("test")
checkMaxPartition
}
}
}

View File

@ -97,10 +97,12 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(true)
val WATCHDOG_MAX_HIVEPARTITION =
buildConf("spark.sql.watchdog.maxHivePartitions")
.doc("Add maxHivePartitions Strategy to avoid scan excessive " +
"hive partitions on partitioned table, it's optional that works with defined")
val WATCHDOG_MAX_PARTITIONS =
buildConf("spark.sql.watchdog.maxPartitions")
.doc("Set the max partition number when spark scans a data source. " +
"Enable MaxPartitionStrategy by specifying this configuration. " +
"Add maxPartitions Strategy to avoid scan excessive partitions " +
"on partitioned table, it's optional that works with defined")
.version("1.4.0")
.intConf
.createOptional

View File

@ -76,4 +76,4 @@ spark.sql.optimizer.finalStageConfigIsolation.enabled | false | If true, the fin
spark.sql.analyzer.classification.enabled | false | When true, allows Kyuubi engine to judge this SQL's classification and set `spark.sql.analyzer.classification` back into sessionConf. Through this configuration item, Spark can optimizing configuration dynamic. | 1.4.0
spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we will follow target table properties to insert zorder or not. The key properties are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by these cols. | 1.4.0
spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder. | 1.4.0
spark.sql.watchdog.maxHivePartitions | none | Add maxHivePartitions Strategy to avoid scan excessive hive partitions on partitioned table, it's optional that works with defined. | 1.4.0
spark.sql.watchdog.maxPartitions | none | Set the max partition number when spark scans a data source. Enable MaxPartitionStrategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined | 1.4.0