diff --git a/docs/extensions/engines/spark/rules.md b/docs/extensions/engines/spark/rules.md index 8fa636c31..50bf087a8 100644 --- a/docs/extensions/engines/spark/rules.md +++ b/docs/extensions/engines/spark/rules.md @@ -73,7 +73,8 @@ Kyuubi provides some configs to make these feature easy to use. | spark.sql.analyzer.classification.enabled | false | When true, allows Kyuubi engine to judge this SQL's classification and set `spark.sql.analyzer.classification` back into sessionConf. Through this configuration item, Spark can optimizing configuration dynamic. | 1.4.0 | | spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we will follow target table properties to insert zorder or not. The key properties are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by these cols. | 1.4.0 | | spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder. | 1.4.0 | -| spark.sql.watchdog.maxPartitions | none | Set the max partition number when spark scans a data source. Enable MaxPartitionStrategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined | 1.4.0 | +| spark.sql.watchdog.maxPartitions | none | Set the max partition number when spark scans a data source. Enable maxPartition Strategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined | 1.4.0 | +| spark.sql.watchdog.maxFileSize | none | Set the maximum size in bytes of files when spark scans a data source. Enable maxFileSize Strategy by specifying this configuration. Add maxFileSize Strategy to avoid scan excessive size of files, it's optional that works with defined | 1.8.0 | | spark.sql.optimizer.dropIgnoreNonExistent | false | When true, do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a non-existent database/table/view/function/partition | 1.5.0 | | spark.sql.optimizer.rebalanceBeforeZorder.enabled | false | When true, we do a rebalance before zorder in case data skew. Note that, if the insertion is dynamic partition we will use the partition columns to rebalance. Note that, this config only affects with Spark 3.3.x. | 1.6.0 | | spark.sql.optimizer.rebalanceZorderColumns.enabled | false | When true and `spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do rebalance before Z-Order. If it's dynamic partition insert, the rebalance expression will include both partition columns and Z-Order columns. Note that, this config only affects with Spark 3.3.x. | 1.6.0 | diff --git a/extensions/spark/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/extensions/spark/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala index cd312de95..f952b56f3 100644 --- a/extensions/spark/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala +++ b/extensions/spark/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala @@ -20,7 +20,7 @@ package org.apache.kyuubi.sql import org.apache.spark.sql.SparkSessionExtensions import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification -import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxPartitionStrategy} +import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxScanStrategy} // scalastyle:off line.size.limit /** @@ -40,6 +40,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) { // watchdog extension extensions.injectOptimizerRule(ForcedMaxOutputRowsRule) - extensions.injectPlannerStrategy(MaxPartitionStrategy) + extensions.injectPlannerStrategy(MaxScanStrategy) } } diff --git a/extensions/spark/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/extensions/spark/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala index ef9da41be..97e777042 100644 --- a/extensions/spark/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala +++ b/extensions/spark/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.sql import org.apache.spark.sql.SparkSessionExtensions -import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxPartitionStrategy} +import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxScanStrategy} // scalastyle:off line.size.limit /** @@ -38,6 +38,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) { // watchdog extension extensions.injectOptimizerRule(ForcedMaxOutputRowsRule) - extensions.injectPlannerStrategy(MaxPartitionStrategy) + extensions.injectPlannerStrategy(MaxScanStrategy) } } diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala index 0db9b3ab8..5d3464228 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.sql import org.apache.spark.sql.{FinalStageResourceManager, InjectCustomResourceProfile, SparkSessionExtensions} -import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxPartitionStrategy} +import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxScanStrategy} // scalastyle:off line.size.limit /** @@ -38,7 +38,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) { // watchdog extension extensions.injectOptimizerRule(ForcedMaxOutputRowsRule) - extensions.injectPlannerStrategy(MaxPartitionStrategy) + extensions.injectPlannerStrategy(MaxScanStrategy) extensions.injectQueryStagePrepRule(FinalStageResourceManager) extensions.injectQueryStagePrepRule(InjectCustomResourceProfile) diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala index aeee45869..cb2f1130e 100644 --- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala +++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.sql +import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ @@ -138,13 +139,23 @@ object KyuubiSQLConf { val WATCHDOG_MAX_PARTITIONS = buildConf("spark.sql.watchdog.maxPartitions") .doc("Set the max partition number when spark scans a data source. " + - "Enable MaxPartitionStrategy by specifying this configuration. " + + "Enable maxPartitions Strategy by specifying this configuration. " + "Add maxPartitions Strategy to avoid scan excessive partitions " + "on partitioned table, it's optional that works with defined") .version("1.4.0") .intConf .createOptional + val WATCHDOG_MAX_FILE_SIZE = + buildConf("spark.sql.watchdog.maxFileSize") + .doc("Set the maximum size in bytes of files when spark scans a data source. " + + "Enable maxFileSize Strategy by specifying this configuration. " + + "Add maxFileSize Strategy to avoid scan excessive size of files," + + " it's optional that works with defined") + .version("1.8.0") + .bytesConf(ByteUnit.BYTE) + .createOptional + val WATCHDOG_FORCED_MAXOUTPUTROWS = buildConf("spark.sql.watchdog.forcedMaxOutputRows") .doc("Add ForcedMaxOutputRows rule to avoid huge output rows of non-limit query " + diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala index b3c58afdf..e44309192 100644 --- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala +++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala @@ -23,3 +23,8 @@ final class MaxPartitionExceedException( private val reason: String = "", private val cause: Throwable = None.orNull) extends KyuubiSQLExtensionException(reason, cause) + +final class MaxFileSizeExceedException( + private val reason: String = "", + private val cause: Throwable = None.orNull) + extends KyuubiSQLExtensionException(reason, cause) diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala deleted file mode 100644 index 61ab07adf..000000000 --- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.sql.watchdog - -import org.apache.hadoop.fs.Path -import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, Strategy} -import org.apache.spark.sql.catalyst.SQLConfHelper -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} -import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} -import org.apache.spark.sql.types.StructType - -import org.apache.kyuubi.sql.KyuubiSQLConf - -/** - * Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table - * 1 Check if scan exceed maxPartition - * 2 Check if Using partitionFilter on partitioned table - * This Strategy Add Planner Strategy after LogicalOptimizer - */ -case class MaxPartitionStrategy(session: SparkSession) - extends Strategy - with SQLConfHelper - with PruneFileSourcePartitionHelper { - override def apply(plan: LogicalPlan): Seq[SparkPlan] = { - val maxScanPartitionsOpt = conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS) - - if (maxScanPartitionsOpt.isDefined) { - checkRelationMaxPartitions(plan, maxScanPartitionsOpt.get) - } - Nil - } - - private def checkRelationMaxPartitions( - plan: LogicalPlan, - maxScanPartitions: Int): Unit = { - plan match { - case ScanOperation(_, _, relation: HiveTableRelation) if relation.isPartitioned => - relation.prunedPartitions match { - case Some(prunedPartitions) => - if (prunedPartitions.size > maxScanPartitions) { - throw new MaxPartitionExceedException( - s""" - |SQL job scan hive partition: ${prunedPartitions.size} - |exceed restrict of hive scan maxPartition $maxScanPartitions - |You should optimize your SQL logical according partition structure - |or shorten query scope such as p_date, detail as below: - |Table: ${relation.tableMeta.qualifiedName} - |Owner: ${relation.tableMeta.owner} - |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")} - |""".stripMargin) - } - case _ => - val totalPartitions = session - .sessionState.catalog.externalCatalog.listPartitionNames( - relation.tableMeta.database, - relation.tableMeta.identifier.table) - if (totalPartitions.size > maxScanPartitions) { - throw new MaxPartitionExceedException( - s""" - |Your SQL job scan a whole huge table without any partition filter, - |You should optimize your SQL logical according partition structure - |or shorten query scope such as p_date, detail as below: - |Table: ${relation.tableMeta.qualifiedName} - |Owner: ${relation.tableMeta.owner} - |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")} - |""".stripMargin) - } - } - case ScanOperation( - _, - filters, - relation @ LogicalRelation( - fsRelation @ HadoopFsRelation( - fileIndex: InMemoryFileIndex, - partitionSchema, - _, - _, - _, - _), - _, - _, - _)) if fsRelation.partitionSchema.nonEmpty => - val (partitionKeyFilters, dataFilter) = - getPartitionKeyFiltersAndDataFilters( - fsRelation.sparkSession, - relation, - partitionSchema, - filters, - relation.output) - val prunedPartitionSize = fileIndex.listFiles( - partitionKeyFilters.toSeq, - dataFilter) - .size - if (prunedPartitionSize > maxScanPartitions) { - throw maxPartitionExceedError( - prunedPartitionSize, - maxScanPartitions, - relation.catalogTable, - fileIndex.rootPaths, - fsRelation.partitionSchema) - } - case ScanOperation( - _, - filters, - logicalRelation @ LogicalRelation( - fsRelation @ HadoopFsRelation( - catalogFileIndex: CatalogFileIndex, - partitionSchema, - _, - _, - _, - _), - _, - _, - _)) if fsRelation.partitionSchema.nonEmpty => - val (partitionKeyFilters, _) = - getPartitionKeyFiltersAndDataFilters( - fsRelation.sparkSession, - logicalRelation, - partitionSchema, - filters, - logicalRelation.output) - - val prunedPartitionSize = - catalogFileIndex.filterPartitions( - partitionKeyFilters.toSeq) - .partitionSpec() - .partitions - .size - if (prunedPartitionSize > maxScanPartitions) { - throw maxPartitionExceedError( - prunedPartitionSize, - maxScanPartitions, - logicalRelation.catalogTable, - catalogFileIndex.rootPaths, - fsRelation.partitionSchema) - } - case _ => - } - } - - def maxPartitionExceedError( - prunedPartitionSize: Int, - maxPartitionSize: Int, - tableMeta: Option[CatalogTable], - rootPaths: Seq[Path], - partitionSchema: StructType): Throwable = { - val truncatedPaths = - if (rootPaths.length > 5) { - rootPaths.slice(0, 5).mkString(",") + """... """ + (rootPaths.length - 5) + " more paths" - } else { - rootPaths.mkString(",") - } - - new MaxPartitionExceedException( - s""" - |SQL job scan data source partition: $prunedPartitionSize - |exceed restrict of data source scan maxPartition $maxPartitionSize - |You should optimize your SQL logical according partition structure - |or shorten query scope such as p_date, detail as below: - |Table: ${tableMeta.map(_.qualifiedName).getOrElse("")} - |Owner: ${tableMeta.map(_.owner).getOrElse("")} - |RootPaths: $truncatedPaths - |Partition Structure: ${partitionSchema.map(_.name).mkString(", ")} - |""".stripMargin) - } -} diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala new file mode 100644 index 000000000..0ee693fcb --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, Strategy} +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} +import org.apache.spark.sql.catalyst.planning.ScanOperation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.types.StructType + +import org.apache.kyuubi.sql.KyuubiSQLConf + +/** + * Add MaxScanStrategy to avoid scan excessive partitions or files + * 1. Check if scan exceed maxPartition of partitioned table + * 2. Check if scan exceed maxFileSize (calculated by hive table and partition statistics) + * This Strategy Add Planner Strategy after LogicalOptimizer + * @param session + */ +case class MaxScanStrategy(session: SparkSession) + extends Strategy + with SQLConfHelper + with PruneFileSourcePartitionHelper { + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + val maxScanPartitionsOpt = conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS) + val maxFileSizeOpt = conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE) + if (maxScanPartitionsOpt.isDefined || maxFileSizeOpt.isDefined) { + checkScan(plan, maxScanPartitionsOpt, maxFileSizeOpt) + } + Nil + } + + private def checkScan( + plan: LogicalPlan, + maxScanPartitionsOpt: Option[Int], + maxFileSizeOpt: Option[Long]): Unit = { + plan match { + case ScanOperation(_, _, relation: HiveTableRelation) => + if (relation.isPartitioned) { + relation.prunedPartitions match { + case Some(prunedPartitions) => + if (maxScanPartitionsOpt.exists(_ < prunedPartitions.size)) { + throw new MaxPartitionExceedException( + s""" + |SQL job scan hive partition: ${prunedPartitions.size} + |exceed restrict of hive scan maxPartition ${maxScanPartitionsOpt.get} + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${relation.tableMeta.qualifiedName} + |Owner: ${relation.tableMeta.owner} + |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")} + |""".stripMargin) + } + lazy val scanFileSize = prunedPartitions.flatMap(_.stats).map(_.sizeInBytes).sum + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw partTableMaxFileExceedError( + scanFileSize, + maxFileSizeOpt.get, + Some(relation.tableMeta), + prunedPartitions.flatMap(_.storage.locationUri).map(_.toString), + relation.partitionCols.map(_.name)) + } + case _ => + lazy val scanPartitions: Int = session + .sessionState.catalog.externalCatalog.listPartitionNames( + relation.tableMeta.database, + relation.tableMeta.identifier.table).size + if (maxScanPartitionsOpt.exists(_ < scanPartitions)) { + throw new MaxPartitionExceedException( + s""" + |Your SQL job scan a whole huge table without any partition filter, + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${relation.tableMeta.qualifiedName} + |Owner: ${relation.tableMeta.owner} + |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")} + |""".stripMargin) + } + + lazy val scanFileSize: BigInt = + relation.tableMeta.stats.map(_.sizeInBytes).getOrElse { + session + .sessionState.catalog.externalCatalog.listPartitions( + relation.tableMeta.database, + relation.tableMeta.identifier.table).flatMap(_.stats).map(_.sizeInBytes).sum + } + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw new MaxFileSizeExceedException( + s""" + |Your SQL job scan a whole huge table without any partition filter, + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${relation.tableMeta.qualifiedName} + |Owner: ${relation.tableMeta.owner} + |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")} + |""".stripMargin) + } + } + } else { + lazy val scanFileSize = relation.tableMeta.stats.map(_.sizeInBytes).sum + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw nonPartTableMaxFileExceedError( + scanFileSize, + maxFileSizeOpt.get, + Some(relation.tableMeta)) + } + } + case ScanOperation( + _, + filters, + relation @ LogicalRelation( + fsRelation @ HadoopFsRelation( + fileIndex: InMemoryFileIndex, + partitionSchema, + _, + _, + _, + _), + _, + _, + _)) => + if (fsRelation.partitionSchema.nonEmpty) { + val (partitionKeyFilters, dataFilter) = + getPartitionKeyFiltersAndDataFilters( + fsRelation.sparkSession, + relation, + partitionSchema, + filters, + relation.output) + val prunedPartitions = fileIndex.listFiles( + partitionKeyFilters.toSeq, + dataFilter) + if (maxScanPartitionsOpt.exists(_ < prunedPartitions.size)) { + throw maxPartitionExceedError( + prunedPartitions.size, + maxScanPartitionsOpt.get, + relation.catalogTable, + fileIndex.rootPaths, + fsRelation.partitionSchema) + } + lazy val scanFileSize = prunedPartitions.flatMap(_.files).map(_.getLen).sum + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw partTableMaxFileExceedError( + scanFileSize, + maxFileSizeOpt.get, + relation.catalogTable, + fileIndex.rootPaths.map(_.toString), + fsRelation.partitionSchema.map(_.name)) + } + } else { + lazy val scanFileSize = fileIndex.sizeInBytes + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw nonPartTableMaxFileExceedError( + scanFileSize, + maxFileSizeOpt.get, + relation.catalogTable) + } + } + case ScanOperation( + _, + filters, + logicalRelation @ LogicalRelation( + fsRelation @ HadoopFsRelation( + catalogFileIndex: CatalogFileIndex, + partitionSchema, + _, + _, + _, + _), + _, + _, + _)) => + if (fsRelation.partitionSchema.nonEmpty) { + val (partitionKeyFilters, _) = + getPartitionKeyFiltersAndDataFilters( + fsRelation.sparkSession, + logicalRelation, + partitionSchema, + filters, + logicalRelation.output) + + val fileIndex = catalogFileIndex.filterPartitions( + partitionKeyFilters.toSeq) + + lazy val prunedPartitionSize = fileIndex.partitionSpec().partitions.size + if (maxScanPartitionsOpt.exists(_ < prunedPartitionSize)) { + throw maxPartitionExceedError( + prunedPartitionSize, + maxScanPartitionsOpt.get, + logicalRelation.catalogTable, + catalogFileIndex.rootPaths, + fsRelation.partitionSchema) + } + + lazy val scanFileSize = fileIndex + .listFiles(Nil, Nil).flatMap(_.files).map(_.getLen).sum + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw partTableMaxFileExceedError( + scanFileSize, + maxFileSizeOpt.get, + logicalRelation.catalogTable, + catalogFileIndex.rootPaths.map(_.toString), + fsRelation.partitionSchema.map(_.name)) + } + } else { + lazy val scanFileSize = catalogFileIndex.sizeInBytes + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw nonPartTableMaxFileExceedError( + scanFileSize, + maxFileSizeOpt.get, + logicalRelation.catalogTable) + } + } + case _ => + } + } + + def maxPartitionExceedError( + prunedPartitionSize: Int, + maxPartitionSize: Int, + tableMeta: Option[CatalogTable], + rootPaths: Seq[Path], + partitionSchema: StructType): Throwable = { + val truncatedPaths = + if (rootPaths.length > 5) { + rootPaths.slice(0, 5).mkString(",") + """... """ + (rootPaths.length - 5) + " more paths" + } else { + rootPaths.mkString(",") + } + + new MaxPartitionExceedException( + s""" + |SQL job scan data source partition: $prunedPartitionSize + |exceed restrict of data source scan maxPartition $maxPartitionSize + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${tableMeta.map(_.qualifiedName).getOrElse("")} + |Owner: ${tableMeta.map(_.owner).getOrElse("")} + |RootPaths: $truncatedPaths + |Partition Structure: ${partitionSchema.map(_.name).mkString(", ")} + |""".stripMargin) + } + + private def partTableMaxFileExceedError( + scanFileSize: Number, + maxFileSize: Long, + tableMeta: Option[CatalogTable], + rootPaths: Seq[String], + partitions: Seq[String]): Throwable = { + val truncatedPaths = + if (rootPaths.length > 5) { + rootPaths.slice(0, 5).mkString(",") + """... """ + (rootPaths.length - 5) + " more paths" + } else { + rootPaths.mkString(",") + } + + new MaxFileSizeExceedException( + s""" + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize $maxFileSize + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${tableMeta.map(_.qualifiedName).getOrElse("")} + |Owner: ${tableMeta.map(_.owner).getOrElse("")} + |RootPaths: $truncatedPaths + |Partition Structure: ${partitions.mkString(", ")} + |""".stripMargin) + } + + private def nonPartTableMaxFileExceedError( + scanFileSize: Number, + maxFileSize: Long, + tableMeta: Option[CatalogTable]): Throwable = { + new MaxFileSizeExceedException( + s""" + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize $maxFileSize + |detail as below: + |Table: ${tableMeta.map(_.qualifiedName).getOrElse("")} + |Owner: ${tableMeta.map(_.owner).getOrElse("")} + |Location: ${tableMeta.map(_.location).getOrElse("")} + |""".stripMargin) + } +} diff --git a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala index e6ecd28c9..6254829f2 100644 --- a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala +++ b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala @@ -17,10 +17,15 @@ package org.apache.spark.sql +import java.io.File + +import scala.collection.JavaConverters._ + +import org.apache.commons.io.FileUtils import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} import org.apache.kyuubi.sql.KyuubiSQLConf -import org.apache.kyuubi.sql.watchdog.MaxPartitionExceedException +import org.apache.kyuubi.sql.watchdog.{MaxFileSizeExceedException, MaxPartitionExceedException} trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest { override protected def beforeAll(): Unit = { @@ -477,4 +482,120 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest { } } } + + private def checkMaxFileSize(tableSize: Long, nonPartTableSize: Long): Unit = { + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> tableSize.toString) { + checkAnswer(sql("SELECT count(distinct(p)) FROM test"), Row(10) :: Nil) + } + + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (tableSize / 2).toString) { + sql("SELECT * FROM test where p=1").queryExecution.sparkPlan + + sql(s"SELECT * FROM test WHERE p in (${Range(0, 3).toList.mkString(",")})") + .queryExecution.sparkPlan + + intercept[MaxFileSizeExceedException]( + sql("SELECT * FROM test where p != 1").queryExecution.sparkPlan) + + intercept[MaxFileSizeExceedException]( + sql("SELECT * FROM test").queryExecution.sparkPlan) + + intercept[MaxFileSizeExceedException](sql( + s"SELECT * FROM test WHERE p in (${Range(0, 6).toList.mkString(",")})") + .queryExecution.sparkPlan) + } + + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> nonPartTableSize.toString) { + checkAnswer(sql("SELECT count(*) FROM test_non_part"), Row(10000) :: Nil) + } + + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (nonPartTableSize - 1).toString) { + intercept[MaxFileSizeExceedException]( + sql("SELECT * FROM test_non_part").queryExecution.sparkPlan) + } + } + + test("watchdog with scan maxFileSize -- hive") { + Seq(false).foreach { convertMetastoreParquet => + withTable("test", "test_non_part", "temp") { + spark.range(10000).selectExpr("id as col") + .createOrReplaceTempView("temp") + + // partitioned table + sql( + s""" + |CREATE TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS parquet""".stripMargin) + for (part <- Range(0, 10)) { + sql( + s""" + |INSERT OVERWRITE TABLE test PARTITION (p='$part') + |select col from temp""".stripMargin) + } + + val tablePath = new File(spark.sessionState.catalog.externalCatalog + .getTable("default", "test").location) + val tableSize = FileUtils.listFiles(tablePath, Array("parquet"), true).asScala + .map(_.length()).sum + assert(tableSize > 0) + + // non-partitioned table + sql( + s""" + |CREATE TABLE test_non_part(i int) + |STORED AS parquet""".stripMargin) + sql( + s""" + |INSERT OVERWRITE TABLE test_non_part + |select col from temp""".stripMargin) + sql("ANALYZE TABLE test_non_part COMPUTE STATISTICS") + + val nonPartTablePath = new File(spark.sessionState.catalog.externalCatalog + .getTable("default", "test_non_part").location) + val nonPartTableSize = FileUtils.listFiles(nonPartTablePath, Array("parquet"), true).asScala + .map(_.length()).sum + assert(nonPartTableSize > 0) + + // check + withSQLConf("spark.sql.hive.convertMetastoreParquet" -> convertMetastoreParquet.toString) { + checkMaxFileSize(tableSize, nonPartTableSize) + } + } + } + } + + test("watchdog with scan maxFileSize -- data source") { + withTempDir { dir => + withTempView("test", "test_non_part") { + // partitioned table + val tablePath = new File(dir, "test") + spark.range(10).selectExpr("id", "id as p") + .write + .partitionBy("p") + .mode("overwrite") + .parquet(tablePath.getCanonicalPath) + spark.read.load(tablePath.getCanonicalPath).createOrReplaceTempView("test") + + val tableSize = FileUtils.listFiles(tablePath, Array("parquet"), true).asScala + .map(_.length()).sum + assert(tableSize > 0) + + // non-partitioned table + val nonPartTablePath = new File(dir, "test_non_part") + spark.range(10000).selectExpr("id", "id as p") + .write + .mode("overwrite") + .parquet(nonPartTablePath.getCanonicalPath) + spark.read.load(nonPartTablePath.getCanonicalPath).createOrReplaceTempView("test_non_part") + + val nonPartTableSize = FileUtils.listFiles(nonPartTablePath, Array("parquet"), true).asScala + .map(_.length()).sum + assert(tableSize > 0) + + // check + checkMaxFileSize(tableSize, nonPartTableSize) + } + } + } }