[KYUUBI #4641] Add MaxFileSizeStrategy to limit max scan file size

### _Why are the changes needed?_

Add MaxFileSizeStrategy to limit max scan file size.
close #4641

### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4642 from wForget/KYUUBI-4641.

Closes #4641

14a680f8e [wforget] comment
d2a393d97 [wforget] comment
b1ef4c52c [wforget] fix
d9e94bd8e [wforget] fix style
8a9121131 [wforget] use optional value
094eb61e3 [wforget] combine
89e2cb4d0 [wforget] [KYUUBI-4641] Add MaxFileSizeStrategy to limit max scan file size

Authored-by: wforget <643348094@qq.com>
Signed-off-by: ulyssesyou <ulyssesyou@apache.org>
This commit is contained in:
wforget 2023-04-23 17:51:44 +08:00 committed by ulyssesyou
parent d0a7ca4ba8
commit 19d5a9a371
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
9 changed files with 450 additions and 194 deletions

View File

@ -73,7 +73,8 @@ Kyuubi provides some configs to make these feature easy to use.
| spark.sql.analyzer.classification.enabled | false | When true, allows Kyuubi engine to judge this SQL's classification and set `spark.sql.analyzer.classification` back into sessionConf. Through this configuration item, Spark can optimizing configuration dynamic. | 1.4.0 |
| spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we will follow target table properties to insert zorder or not. The key properties are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by these cols. | 1.4.0 |
| spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder. | 1.4.0 |
| spark.sql.watchdog.maxPartitions | none | Set the max partition number when spark scans a data source. Enable MaxPartitionStrategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined | 1.4.0 |
| spark.sql.watchdog.maxPartitions | none | Set the max partition number when spark scans a data source. Enable maxPartition Strategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined | 1.4.0 |
| spark.sql.watchdog.maxFileSize | none | Set the maximum size in bytes of files when spark scans a data source. Enable maxFileSize Strategy by specifying this configuration. Add maxFileSize Strategy to avoid scan excessive size of files, it's optional that works with defined | 1.8.0 |
| spark.sql.optimizer.dropIgnoreNonExistent | false | When true, do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a non-existent database/table/view/function/partition | 1.5.0 |
| spark.sql.optimizer.rebalanceBeforeZorder.enabled | false | When true, we do a rebalance before zorder in case data skew. Note that, if the insertion is dynamic partition we will use the partition columns to rebalance. Note that, this config only affects with Spark 3.3.x. | 1.6.0 |
| spark.sql.optimizer.rebalanceZorderColumns.enabled | false | When true and `spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do rebalance before Z-Order. If it's dynamic partition insert, the rebalance expression will include both partition columns and Z-Order columns. Note that, this config only affects with Spark 3.3.x. | 1.6.0 |

View File

@ -20,7 +20,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification
import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxPartitionStrategy}
import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxScanStrategy}
// scalastyle:off line.size.limit
/**
@ -40,6 +40,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
// watchdog extension
extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxPartitionStrategy)
extensions.injectPlannerStrategy(MaxScanStrategy)
}
}

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxPartitionStrategy}
import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxScanStrategy}
// scalastyle:off line.size.limit
/**
@ -38,6 +38,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
// watchdog extension
extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxPartitionStrategy)
extensions.injectPlannerStrategy(MaxScanStrategy)
}
}

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.{FinalStageResourceManager, InjectCustomResourceProfile, SparkSessionExtensions}
import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxPartitionStrategy}
import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxScanStrategy}
// scalastyle:off line.size.limit
/**
@ -38,7 +38,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
// watchdog extension
extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxPartitionStrategy)
extensions.injectPlannerStrategy(MaxScanStrategy)
extensions.injectQueryStagePrepRule(FinalStageResourceManager)
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)

View File

@ -17,6 +17,7 @@
package org.apache.kyuubi.sql
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf._
@ -138,13 +139,23 @@ object KyuubiSQLConf {
val WATCHDOG_MAX_PARTITIONS =
buildConf("spark.sql.watchdog.maxPartitions")
.doc("Set the max partition number when spark scans a data source. " +
"Enable MaxPartitionStrategy by specifying this configuration. " +
"Enable maxPartitions Strategy by specifying this configuration. " +
"Add maxPartitions Strategy to avoid scan excessive partitions " +
"on partitioned table, it's optional that works with defined")
.version("1.4.0")
.intConf
.createOptional
val WATCHDOG_MAX_FILE_SIZE =
buildConf("spark.sql.watchdog.maxFileSize")
.doc("Set the maximum size in bytes of files when spark scans a data source. " +
"Enable maxFileSize Strategy by specifying this configuration. " +
"Add maxFileSize Strategy to avoid scan excessive size of files," +
" it's optional that works with defined")
.version("1.8.0")
.bytesConf(ByteUnit.BYTE)
.createOptional
val WATCHDOG_FORCED_MAXOUTPUTROWS =
buildConf("spark.sql.watchdog.forcedMaxOutputRows")
.doc("Add ForcedMaxOutputRows rule to avoid huge output rows of non-limit query " +

View File

@ -23,3 +23,8 @@ final class MaxPartitionExceedException(
private val reason: String = "",
private val cause: Throwable = None.orNull)
extends KyuubiSQLExtensionException(reason, cause)
final class MaxFileSizeExceedException(
private val reason: String = "",
private val cause: Throwable = None.orNull)
extends KyuubiSQLExtensionException(reason, cause)

View File

@ -1,185 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.sql.watchdog
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.sql.KyuubiSQLConf
/**
* Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table
* 1 Check if scan exceed maxPartition
* 2 Check if Using partitionFilter on partitioned table
* This Strategy Add Planner Strategy after LogicalOptimizer
*/
case class MaxPartitionStrategy(session: SparkSession)
extends Strategy
with SQLConfHelper
with PruneFileSourcePartitionHelper {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
val maxScanPartitionsOpt = conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS)
if (maxScanPartitionsOpt.isDefined) {
checkRelationMaxPartitions(plan, maxScanPartitionsOpt.get)
}
Nil
}
private def checkRelationMaxPartitions(
plan: LogicalPlan,
maxScanPartitions: Int): Unit = {
plan match {
case ScanOperation(_, _, relation: HiveTableRelation) if relation.isPartitioned =>
relation.prunedPartitions match {
case Some(prunedPartitions) =>
if (prunedPartitions.size > maxScanPartitions) {
throw new MaxPartitionExceedException(
s"""
|SQL job scan hive partition: ${prunedPartitions.size}
|exceed restrict of hive scan maxPartition $maxScanPartitions
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${relation.tableMeta.qualifiedName}
|Owner: ${relation.tableMeta.owner}
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
|""".stripMargin)
}
case _ =>
val totalPartitions = session
.sessionState.catalog.externalCatalog.listPartitionNames(
relation.tableMeta.database,
relation.tableMeta.identifier.table)
if (totalPartitions.size > maxScanPartitions) {
throw new MaxPartitionExceedException(
s"""
|Your SQL job scan a whole huge table without any partition filter,
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${relation.tableMeta.qualifiedName}
|Owner: ${relation.tableMeta.owner}
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
|""".stripMargin)
}
}
case ScanOperation(
_,
filters,
relation @ LogicalRelation(
fsRelation @ HadoopFsRelation(
fileIndex: InMemoryFileIndex,
partitionSchema,
_,
_,
_,
_),
_,
_,
_)) if fsRelation.partitionSchema.nonEmpty =>
val (partitionKeyFilters, dataFilter) =
getPartitionKeyFiltersAndDataFilters(
fsRelation.sparkSession,
relation,
partitionSchema,
filters,
relation.output)
val prunedPartitionSize = fileIndex.listFiles(
partitionKeyFilters.toSeq,
dataFilter)
.size
if (prunedPartitionSize > maxScanPartitions) {
throw maxPartitionExceedError(
prunedPartitionSize,
maxScanPartitions,
relation.catalogTable,
fileIndex.rootPaths,
fsRelation.partitionSchema)
}
case ScanOperation(
_,
filters,
logicalRelation @ LogicalRelation(
fsRelation @ HadoopFsRelation(
catalogFileIndex: CatalogFileIndex,
partitionSchema,
_,
_,
_,
_),
_,
_,
_)) if fsRelation.partitionSchema.nonEmpty =>
val (partitionKeyFilters, _) =
getPartitionKeyFiltersAndDataFilters(
fsRelation.sparkSession,
logicalRelation,
partitionSchema,
filters,
logicalRelation.output)
val prunedPartitionSize =
catalogFileIndex.filterPartitions(
partitionKeyFilters.toSeq)
.partitionSpec()
.partitions
.size
if (prunedPartitionSize > maxScanPartitions) {
throw maxPartitionExceedError(
prunedPartitionSize,
maxScanPartitions,
logicalRelation.catalogTable,
catalogFileIndex.rootPaths,
fsRelation.partitionSchema)
}
case _ =>
}
}
def maxPartitionExceedError(
prunedPartitionSize: Int,
maxPartitionSize: Int,
tableMeta: Option[CatalogTable],
rootPaths: Seq[Path],
partitionSchema: StructType): Throwable = {
val truncatedPaths =
if (rootPaths.length > 5) {
rootPaths.slice(0, 5).mkString(",") + """... """ + (rootPaths.length - 5) + " more paths"
} else {
rootPaths.mkString(",")
}
new MaxPartitionExceedException(
s"""
|SQL job scan data source partition: $prunedPartitionSize
|exceed restrict of data source scan maxPartition $maxPartitionSize
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${tableMeta.map(_.qualifiedName).getOrElse("")}
|Owner: ${tableMeta.map(_.owner).getOrElse("")}
|RootPaths: $truncatedPaths
|Partition Structure: ${partitionSchema.map(_.name).mkString(", ")}
|""".stripMargin)
}
}

View File

@ -0,0 +1,303 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.sql.watchdog
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.sql.KyuubiSQLConf
/**
* Add MaxScanStrategy to avoid scan excessive partitions or files
* 1. Check if scan exceed maxPartition of partitioned table
* 2. Check if scan exceed maxFileSize (calculated by hive table and partition statistics)
* This Strategy Add Planner Strategy after LogicalOptimizer
* @param session
*/
case class MaxScanStrategy(session: SparkSession)
extends Strategy
with SQLConfHelper
with PruneFileSourcePartitionHelper {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
val maxScanPartitionsOpt = conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS)
val maxFileSizeOpt = conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE)
if (maxScanPartitionsOpt.isDefined || maxFileSizeOpt.isDefined) {
checkScan(plan, maxScanPartitionsOpt, maxFileSizeOpt)
}
Nil
}
private def checkScan(
plan: LogicalPlan,
maxScanPartitionsOpt: Option[Int],
maxFileSizeOpt: Option[Long]): Unit = {
plan match {
case ScanOperation(_, _, relation: HiveTableRelation) =>
if (relation.isPartitioned) {
relation.prunedPartitions match {
case Some(prunedPartitions) =>
if (maxScanPartitionsOpt.exists(_ < prunedPartitions.size)) {
throw new MaxPartitionExceedException(
s"""
|SQL job scan hive partition: ${prunedPartitions.size}
|exceed restrict of hive scan maxPartition ${maxScanPartitionsOpt.get}
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${relation.tableMeta.qualifiedName}
|Owner: ${relation.tableMeta.owner}
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
|""".stripMargin)
}
lazy val scanFileSize = prunedPartitions.flatMap(_.stats).map(_.sizeInBytes).sum
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw partTableMaxFileExceedError(
scanFileSize,
maxFileSizeOpt.get,
Some(relation.tableMeta),
prunedPartitions.flatMap(_.storage.locationUri).map(_.toString),
relation.partitionCols.map(_.name))
}
case _ =>
lazy val scanPartitions: Int = session
.sessionState.catalog.externalCatalog.listPartitionNames(
relation.tableMeta.database,
relation.tableMeta.identifier.table).size
if (maxScanPartitionsOpt.exists(_ < scanPartitions)) {
throw new MaxPartitionExceedException(
s"""
|Your SQL job scan a whole huge table without any partition filter,
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${relation.tableMeta.qualifiedName}
|Owner: ${relation.tableMeta.owner}
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
|""".stripMargin)
}
lazy val scanFileSize: BigInt =
relation.tableMeta.stats.map(_.sizeInBytes).getOrElse {
session
.sessionState.catalog.externalCatalog.listPartitions(
relation.tableMeta.database,
relation.tableMeta.identifier.table).flatMap(_.stats).map(_.sizeInBytes).sum
}
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw new MaxFileSizeExceedException(
s"""
|Your SQL job scan a whole huge table without any partition filter,
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${relation.tableMeta.qualifiedName}
|Owner: ${relation.tableMeta.owner}
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
|""".stripMargin)
}
}
} else {
lazy val scanFileSize = relation.tableMeta.stats.map(_.sizeInBytes).sum
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw nonPartTableMaxFileExceedError(
scanFileSize,
maxFileSizeOpt.get,
Some(relation.tableMeta))
}
}
case ScanOperation(
_,
filters,
relation @ LogicalRelation(
fsRelation @ HadoopFsRelation(
fileIndex: InMemoryFileIndex,
partitionSchema,
_,
_,
_,
_),
_,
_,
_)) =>
if (fsRelation.partitionSchema.nonEmpty) {
val (partitionKeyFilters, dataFilter) =
getPartitionKeyFiltersAndDataFilters(
fsRelation.sparkSession,
relation,
partitionSchema,
filters,
relation.output)
val prunedPartitions = fileIndex.listFiles(
partitionKeyFilters.toSeq,
dataFilter)
if (maxScanPartitionsOpt.exists(_ < prunedPartitions.size)) {
throw maxPartitionExceedError(
prunedPartitions.size,
maxScanPartitionsOpt.get,
relation.catalogTable,
fileIndex.rootPaths,
fsRelation.partitionSchema)
}
lazy val scanFileSize = prunedPartitions.flatMap(_.files).map(_.getLen).sum
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw partTableMaxFileExceedError(
scanFileSize,
maxFileSizeOpt.get,
relation.catalogTable,
fileIndex.rootPaths.map(_.toString),
fsRelation.partitionSchema.map(_.name))
}
} else {
lazy val scanFileSize = fileIndex.sizeInBytes
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw nonPartTableMaxFileExceedError(
scanFileSize,
maxFileSizeOpt.get,
relation.catalogTable)
}
}
case ScanOperation(
_,
filters,
logicalRelation @ LogicalRelation(
fsRelation @ HadoopFsRelation(
catalogFileIndex: CatalogFileIndex,
partitionSchema,
_,
_,
_,
_),
_,
_,
_)) =>
if (fsRelation.partitionSchema.nonEmpty) {
val (partitionKeyFilters, _) =
getPartitionKeyFiltersAndDataFilters(
fsRelation.sparkSession,
logicalRelation,
partitionSchema,
filters,
logicalRelation.output)
val fileIndex = catalogFileIndex.filterPartitions(
partitionKeyFilters.toSeq)
lazy val prunedPartitionSize = fileIndex.partitionSpec().partitions.size
if (maxScanPartitionsOpt.exists(_ < prunedPartitionSize)) {
throw maxPartitionExceedError(
prunedPartitionSize,
maxScanPartitionsOpt.get,
logicalRelation.catalogTable,
catalogFileIndex.rootPaths,
fsRelation.partitionSchema)
}
lazy val scanFileSize = fileIndex
.listFiles(Nil, Nil).flatMap(_.files).map(_.getLen).sum
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw partTableMaxFileExceedError(
scanFileSize,
maxFileSizeOpt.get,
logicalRelation.catalogTable,
catalogFileIndex.rootPaths.map(_.toString),
fsRelation.partitionSchema.map(_.name))
}
} else {
lazy val scanFileSize = catalogFileIndex.sizeInBytes
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw nonPartTableMaxFileExceedError(
scanFileSize,
maxFileSizeOpt.get,
logicalRelation.catalogTable)
}
}
case _ =>
}
}
def maxPartitionExceedError(
prunedPartitionSize: Int,
maxPartitionSize: Int,
tableMeta: Option[CatalogTable],
rootPaths: Seq[Path],
partitionSchema: StructType): Throwable = {
val truncatedPaths =
if (rootPaths.length > 5) {
rootPaths.slice(0, 5).mkString(",") + """... """ + (rootPaths.length - 5) + " more paths"
} else {
rootPaths.mkString(",")
}
new MaxPartitionExceedException(
s"""
|SQL job scan data source partition: $prunedPartitionSize
|exceed restrict of data source scan maxPartition $maxPartitionSize
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${tableMeta.map(_.qualifiedName).getOrElse("")}
|Owner: ${tableMeta.map(_.owner).getOrElse("")}
|RootPaths: $truncatedPaths
|Partition Structure: ${partitionSchema.map(_.name).mkString(", ")}
|""".stripMargin)
}
private def partTableMaxFileExceedError(
scanFileSize: Number,
maxFileSize: Long,
tableMeta: Option[CatalogTable],
rootPaths: Seq[String],
partitions: Seq[String]): Throwable = {
val truncatedPaths =
if (rootPaths.length > 5) {
rootPaths.slice(0, 5).mkString(",") + """... """ + (rootPaths.length - 5) + " more paths"
} else {
rootPaths.mkString(",")
}
new MaxFileSizeExceedException(
s"""
|SQL job scan file size in bytes: $scanFileSize
|exceed restrict of table scan maxFileSize $maxFileSize
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${tableMeta.map(_.qualifiedName).getOrElse("")}
|Owner: ${tableMeta.map(_.owner).getOrElse("")}
|RootPaths: $truncatedPaths
|Partition Structure: ${partitions.mkString(", ")}
|""".stripMargin)
}
private def nonPartTableMaxFileExceedError(
scanFileSize: Number,
maxFileSize: Long,
tableMeta: Option[CatalogTable]): Throwable = {
new MaxFileSizeExceedException(
s"""
|SQL job scan file size in bytes: $scanFileSize
|exceed restrict of table scan maxFileSize $maxFileSize
|detail as below:
|Table: ${tableMeta.map(_.qualifiedName).getOrElse("")}
|Owner: ${tableMeta.map(_.owner).getOrElse("")}
|Location: ${tableMeta.map(_.location).getOrElse("")}
|""".stripMargin)
}
}

View File

@ -17,10 +17,15 @@
package org.apache.spark.sql
import java.io.File
import scala.collection.JavaConverters._
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
import org.apache.kyuubi.sql.KyuubiSQLConf
import org.apache.kyuubi.sql.watchdog.MaxPartitionExceedException
import org.apache.kyuubi.sql.watchdog.{MaxFileSizeExceedException, MaxPartitionExceedException}
trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest {
override protected def beforeAll(): Unit = {
@ -477,4 +482,120 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest {
}
}
}
private def checkMaxFileSize(tableSize: Long, nonPartTableSize: Long): Unit = {
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> tableSize.toString) {
checkAnswer(sql("SELECT count(distinct(p)) FROM test"), Row(10) :: Nil)
}
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (tableSize / 2).toString) {
sql("SELECT * FROM test where p=1").queryExecution.sparkPlan
sql(s"SELECT * FROM test WHERE p in (${Range(0, 3).toList.mkString(",")})")
.queryExecution.sparkPlan
intercept[MaxFileSizeExceedException](
sql("SELECT * FROM test where p != 1").queryExecution.sparkPlan)
intercept[MaxFileSizeExceedException](
sql("SELECT * FROM test").queryExecution.sparkPlan)
intercept[MaxFileSizeExceedException](sql(
s"SELECT * FROM test WHERE p in (${Range(0, 6).toList.mkString(",")})")
.queryExecution.sparkPlan)
}
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> nonPartTableSize.toString) {
checkAnswer(sql("SELECT count(*) FROM test_non_part"), Row(10000) :: Nil)
}
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (nonPartTableSize - 1).toString) {
intercept[MaxFileSizeExceedException](
sql("SELECT * FROM test_non_part").queryExecution.sparkPlan)
}
}
test("watchdog with scan maxFileSize -- hive") {
Seq(false).foreach { convertMetastoreParquet =>
withTable("test", "test_non_part", "temp") {
spark.range(10000).selectExpr("id as col")
.createOrReplaceTempView("temp")
// partitioned table
sql(
s"""
|CREATE TABLE test(i int)
|PARTITIONED BY (p int)
|STORED AS parquet""".stripMargin)
for (part <- Range(0, 10)) {
sql(
s"""
|INSERT OVERWRITE TABLE test PARTITION (p='$part')
|select col from temp""".stripMargin)
}
val tablePath = new File(spark.sessionState.catalog.externalCatalog
.getTable("default", "test").location)
val tableSize = FileUtils.listFiles(tablePath, Array("parquet"), true).asScala
.map(_.length()).sum
assert(tableSize > 0)
// non-partitioned table
sql(
s"""
|CREATE TABLE test_non_part(i int)
|STORED AS parquet""".stripMargin)
sql(
s"""
|INSERT OVERWRITE TABLE test_non_part
|select col from temp""".stripMargin)
sql("ANALYZE TABLE test_non_part COMPUTE STATISTICS")
val nonPartTablePath = new File(spark.sessionState.catalog.externalCatalog
.getTable("default", "test_non_part").location)
val nonPartTableSize = FileUtils.listFiles(nonPartTablePath, Array("parquet"), true).asScala
.map(_.length()).sum
assert(nonPartTableSize > 0)
// check
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> convertMetastoreParquet.toString) {
checkMaxFileSize(tableSize, nonPartTableSize)
}
}
}
}
test("watchdog with scan maxFileSize -- data source") {
withTempDir { dir =>
withTempView("test", "test_non_part") {
// partitioned table
val tablePath = new File(dir, "test")
spark.range(10).selectExpr("id", "id as p")
.write
.partitionBy("p")
.mode("overwrite")
.parquet(tablePath.getCanonicalPath)
spark.read.load(tablePath.getCanonicalPath).createOrReplaceTempView("test")
val tableSize = FileUtils.listFiles(tablePath, Array("parquet"), true).asScala
.map(_.length()).sum
assert(tableSize > 0)
// non-partitioned table
val nonPartTablePath = new File(dir, "test_non_part")
spark.range(10000).selectExpr("id", "id as p")
.write
.mode("overwrite")
.parquet(nonPartTablePath.getCanonicalPath)
spark.read.load(nonPartTablePath.getCanonicalPath).createOrReplaceTempView("test_non_part")
val nonPartTableSize = FileUtils.listFiles(nonPartTablePath, Array("parquet"), true).asScala
.map(_.length()).sum
assert(tableSize > 0)
// check
checkMaxFileSize(tableSize, nonPartTableSize)
}
}
}
}