From 9dfbd2b83064b854d2f21d6370e95bb382f0c482 Mon Sep 17 00:00:00 2001 From: h Date: Mon, 13 Sep 2021 20:41:12 +0800 Subject: [PATCH] [KYUUBI #1086] Add maxHivePartitions Strategy to avoid scanning excessive hive partitions on partitioned table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### _Why are the changes needed?_ The PR support this Strategy to avoid scan huge hive partitions or unused necessary partition filter on partitioned table,It will abort the SQL query with thrown exception via scan hive table partitions exceed `spark.sql.watchdog.maxHivePartitions`, meanwhile show the related hive table partition struct to assist user to optimize sql according the given suggestion ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate 截屏2021-09-12 下午2 30 20 - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1086 from i7xh/watchdogMaxHivePartition. Closes #1086 d535b57e [h] Fixed: scala style newline of end 774a486f [h] Resolved isssue: shrink unnesscary test data 0541dcc5 [h] Resovled issue: replace listPartitions with listPartitionNames 5a97c5d0 [h] Resolved issue: unify Exceed maxHivePartitions without partition filter 1581b297 [h] resovled the code review issues d0b0fc46 [h] Add maxHivePartitions Strategy to avoid scan excessive hive partitions on partitioned table Authored-by: h Signed-off-by: ulysses-you --- .../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 8 ++ .../kyuubi/sql/KyuubiSparkSQLExtension.scala | 4 + .../watchdog/KyuubiWatchDogException.scala | 25 ++++++ .../watchdog/MaxHivePartitionStrategy.scala | 78 +++++++++++++++++++ .../spark/sql/KyuubiExtensionSuite.scala | 40 +++++++++- 5 files changed, 153 insertions(+), 2 deletions(-) create mode 100644 dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala create mode 100644 dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxHivePartitionStrategy.scala diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala index 5ccfe0790..cb21bcd40 100644 --- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala +++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -86,4 +86,12 @@ object KyuubiSQLConf { .version("1.4.0") .booleanConf .createWithDefault(true) + + val WATCHDOG_MAX_HIVEPARTITION = + buildConf("spark.sql.watchdog.maxHivePartitions") + .doc("Add maxHivePartitions Strategy to avoid scan excessive " + + "hive partitions on partitioned table, it's optional that works with defined") + .version("1.4.0") + .intConf + .createOptional } diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala index 87df6e6cf..81100178b 100644 --- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala +++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala @@ -20,7 +20,10 @@ package org.apache.kyuubi.sql import org.apache.spark.sql.SparkSessionExtensions import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification +import org.apache.kyuubi.sql.watchdog.MaxHivePartitionStrategy import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder, ZorderSparkSqlExtensionsParser} +import org.apache.kyuubi.sql.zorder.ResolveZorder +import org.apache.kyuubi.sql.zorder.ZorderSparkSqlExtensionsParser // scalastyle:off line.size.limit /** @@ -49,5 +52,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) { extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule) extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin) extensions.injectQueryStagePrepRule(FinalStageConfigIsolation(_)) + extensions.injectPlannerStrategy(MaxHivePartitionStrategy) } } diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala new file mode 100644 index 000000000..e84007de3 --- /dev/null +++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import org.apache.kyuubi.sql.KyuubiSQLExtensionException + +final class MaxHivePartitionExceedException( + private val reason: String = "", + private val cause: Throwable = None.orNull) + extends KyuubiSQLExtensionException(reason, cause) diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxHivePartitionStrategy.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxHivePartitionStrategy.scala new file mode 100644 index 000000000..a808d4da6 --- /dev/null +++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxHivePartitionStrategy.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import org.apache.spark.sql.{SparkSession, Strategy} +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.planning.ScanOperation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan + +import org.apache.kyuubi.sql.KyuubiSQLConf + +/** + * Add maxHivePartitions Strategy to avoid scan excessive hive partitions on partitioned table + * 1 Check if scan exceed maxHivePartition + * 2 Check if Using partitionFilter on partitioned table + * This Strategy Add Planner Strategy after LogicalOptimizer + */ +case class MaxHivePartitionStrategy(session: SparkSession) + extends Strategy with SQLConfHelper { + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_HIVEPARTITION) match { + case Some(maxHivePartition) => plan match { + case ScanOperation(_, _, relation: HiveTableRelation) if relation.isPartitioned => + relation.prunedPartitions match { + case Some(prunedPartitions) => if (prunedPartitions.size > maxHivePartition) { + throw new MaxHivePartitionExceedException( + s""" + |SQL job scan hive partition: ${prunedPartitions.size} + |exceed restrict of hive scan maxPartition $maxHivePartition + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${relation.tableMeta.qualifiedName} + |Owner: ${relation.tableMeta.owner} + |Partition Structure: ${relation.partitionCols.map(_.name).mkString(" -> ")} + |""".stripMargin) + } else { + Nil + } + case _ => val totalPartitions = session + .sessionState.catalog.externalCatalog.listPartitionNames( + relation.tableMeta.database, relation.tableMeta.identifier.table) + if (totalPartitions.size > maxHivePartition) { + throw new MaxHivePartitionExceedException( + s""" + |Your SQL job scan a whole huge table without any partition filter, + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${relation.tableMeta.qualifiedName} + |Owner: ${relation.tableMeta.owner} + |Partition Structure: ${relation.partitionCols.map(_.name).mkString(" -> ")} + |""".stripMargin) + } else { + Nil + } + } + case _ => Nil + } + case _ => Nil + } + } +} diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala index b9194d3bd..ccacbd4b7 100644 --- a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala +++ b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala @@ -31,8 +31,8 @@ import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.test.SQLTestUtils -import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf} -import org.apache.kyuubi.sql.KyuubiSQLExtensionException +import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf, KyuubiSQLExtensionException} +import org.apache.kyuubi.sql.watchdog.MaxHivePartitionExceedException import org.apache.kyuubi.sql.zorder.Zorder class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSparkPlanHelper { @@ -1584,4 +1584,40 @@ class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSpar checkZorderTable(true, "c4, c2, c1, c3", false, true) } } + + test("test watchdog with scan maxHivePartitions") { + withTable("test", "temp") { + sql( + s""" + |CREATE TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS textfile""".stripMargin) + spark.range(0, 10, 1).selectExpr("id as col") + .createOrReplaceTempView("temp") + + for (part <- Range(0, 10)) { + sql( + s""" + |INSERT OVERWRITE TABLE test PARTITION (p='$part') + |select col from temp""".stripMargin) + } + + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_HIVEPARTITION.key -> "5") { + + sql("SELECT * FROM test where p=1").queryExecution.sparkPlan + + sql( + s"SELECT * FROM test WHERE p in (${Range(0, 5).toList.mkString(",")})") + .queryExecution.sparkPlan + + intercept[MaxHivePartitionExceedException]( + sql("SELECT * FROM test").queryExecution.sparkPlan) + + intercept[MaxHivePartitionExceedException](sql( + s"SELECT * FROM test WHERE p in (${Range(0, 6).toList.mkString(",")})") + .queryExecution.sparkPlan) + + } + } + } }