[KYUUBI #6315] Spark 3.5: MaxScanStrategy supports DSv2
# 🔍 Description ## Issue References 🔗 Now, MaxScanStrategy can be adopted to limit max scan file size in some datasources, such as Hive. Hopefully we can enhance MaxScanStrategy to include support for the datasourcev2. ## Describe Your Solution 🔧 get the statistics about files scanned through datasourcev2 API ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests --- # Checklists ## 📝 Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [x] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [x] New and existing unit tests pass locally with my changes - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## 📝 Committer Pre-Merge Checklist - [x] Pull request title is okay. - [x] No license issues. - [x] Milestone correctly set? - [x] Test coverage is ok - [x] Assignees are selected. - [ ] Minimum number of approvals - [ ] No changes are requested **Be nice. Be informative.** Closes #5852 from zhaohehuhu/dev-1213. Closes #6315 3c5b0c276 [hezhao2] reformat fb113d625 [hezhao2] disable the rule that checks the maxPartitions for dsv2 acc358732 [hezhao2] disable the rule that checks the maxPartitions for dsv2 c8399a021 [hezhao2] fix header 70c845bee [hezhao2] add UTs 3a0739686 [hezhao2] add ut 4d26ce131 [hezhao2] reformat f87cb072c [hezhao2] reformat b307022b8 [hezhao2] move code to Spark 3.5 73258c2ae [hezhao2] fix unused import cf893a0e1 [hezhao2] drop reflection for loading iceberg class dc128bc8e [hezhao2] refactor code 661834cce [hezhao2] revert code 6061f42ab [hezhao2] delete IcebergSparkPlanHelper 5f1c3c082 [hezhao2] fix b15652f05 [hezhao2] remove iceberg dependency fe620ca92 [hezhao2] enable MaxScanStrategy when accessing iceberg datasource Authored-by: hezhao2 <hezhao2@cisco.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
962de72681
commit
8edcb005ee
@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.sql.execution.SparkPlan
|
||||
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
|
||||
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
import org.apache.kyuubi.sql.KyuubiSQLConf
|
||||
@ -232,6 +233,40 @@ case class MaxScanStrategy(session: SparkSession)
|
||||
logicalRelation.catalogTable)
|
||||
}
|
||||
}
|
||||
case ScanOperation(
|
||||
_,
|
||||
_,
|
||||
_,
|
||||
relation @ DataSourceV2ScanRelation(_, _, _, _, _)) =>
|
||||
val table = relation.relation.table
|
||||
if (table.partitioning().nonEmpty) {
|
||||
val partitionColumnNames = table.partitioning().map(_.describe())
|
||||
val stats = relation.computeStats()
|
||||
lazy val scanFileSize = stats.sizeInBytes
|
||||
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
|
||||
throw new MaxFileSizeExceedException(
|
||||
s"""
|
||||
|SQL job scan file size in bytes: $scanFileSize
|
||||
|exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get}
|
||||
|You should optimize your SQL logical according partition structure
|
||||
|or shorten query scope such as p_date, detail as below:
|
||||
|Table: ${table.name()}
|
||||
|Partition Structure: ${partitionColumnNames.mkString(",")}
|
||||
|""".stripMargin)
|
||||
}
|
||||
} else {
|
||||
val stats = relation.computeStats()
|
||||
lazy val scanFileSize = stats.sizeInBytes
|
||||
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
|
||||
throw new MaxFileSizeExceedException(
|
||||
s"""
|
||||
|SQL job scan file size in bytes: $scanFileSize
|
||||
|exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get}
|
||||
|detail as below:
|
||||
|Table: ${table.name()}
|
||||
|""".stripMargin)
|
||||
}
|
||||
}
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql
|
||||
|
||||
import java.util.OptionalLong
|
||||
|
||||
import org.apache.spark.sql.connector.{RangeInputPartition, SimpleBatchTable, SimpleScanBuilder, SimpleWritableDataSource}
|
||||
import org.apache.spark.sql.connector.catalog.Table
|
||||
import org.apache.spark.sql.connector.expressions.{Expressions, FieldReference, Transform}
|
||||
import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder, Statistics, SupportsReportPartitioning, SupportsReportStatistics}
|
||||
import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning}
|
||||
import org.apache.spark.sql.util.CaseInsensitiveStringMap
|
||||
|
||||
class ReportStatisticsAndPartitionAwareDataSource extends SimpleWritableDataSource {
|
||||
|
||||
class MyScanBuilder(
|
||||
val partitionKeys: Seq[String]) extends SimpleScanBuilder
|
||||
with SupportsReportStatistics with SupportsReportPartitioning {
|
||||
|
||||
override def estimateStatistics(): Statistics = {
|
||||
new Statistics {
|
||||
override def sizeInBytes(): OptionalLong = OptionalLong.of(80)
|
||||
|
||||
override def numRows(): OptionalLong = OptionalLong.of(10)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
override def planInputPartitions(): Array[InputPartition] = {
|
||||
Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10))
|
||||
}
|
||||
|
||||
override def outputPartitioning(): Partitioning = {
|
||||
new KeyGroupedPartitioning(partitionKeys.map(FieldReference(_)).toArray, 10)
|
||||
}
|
||||
}
|
||||
|
||||
override def getTable(options: CaseInsensitiveStringMap): Table = {
|
||||
new SimpleBatchTable {
|
||||
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
|
||||
new MyScanBuilder(Seq("i"))
|
||||
}
|
||||
|
||||
override def partitioning(): Array[Transform] = {
|
||||
Array(Expressions.identity("i"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql
|
||||
|
||||
import java.util.OptionalLong
|
||||
|
||||
import org.apache.spark.sql.connector._
|
||||
import org.apache.spark.sql.connector.catalog.Table
|
||||
import org.apache.spark.sql.connector.read._
|
||||
import org.apache.spark.sql.util.CaseInsensitiveStringMap
|
||||
|
||||
class ReportStatisticsDataSource extends SimpleWritableDataSource {
|
||||
|
||||
class MyScanBuilder extends SimpleScanBuilder
|
||||
with SupportsReportStatistics {
|
||||
|
||||
override def estimateStatistics(): Statistics = {
|
||||
new Statistics {
|
||||
override def sizeInBytes(): OptionalLong = OptionalLong.of(80)
|
||||
|
||||
override def numRows(): OptionalLong = OptionalLong.of(10)
|
||||
}
|
||||
}
|
||||
|
||||
override def planInputPartitions(): Array[InputPartition] = {
|
||||
Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
override def getTable(options: CaseInsensitiveStringMap): Table = {
|
||||
new SimpleBatchTable {
|
||||
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
|
||||
new MyScanBuilder
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.commons.io.FileUtils
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
|
||||
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
|
||||
|
||||
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
|
||||
import org.apache.kyuubi.sql.watchdog.{MaxFileSizeExceedException, MaxPartitionExceedException}
|
||||
@ -607,4 +608,36 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest {
|
||||
assert(e.getMessage == "Script transformation is not allowed")
|
||||
}
|
||||
}
|
||||
|
||||
test("watchdog with scan maxFileSize -- data source v2") {
|
||||
val df = spark.read.format(classOf[ReportStatisticsAndPartitionAwareDataSource].getName).load()
|
||||
df.createOrReplaceTempView("test")
|
||||
val logical = df.queryExecution.optimizedPlan.collect {
|
||||
case d: DataSourceV2ScanRelation => d
|
||||
}.head
|
||||
val tableSize = logical.computeStats().sizeInBytes.toLong
|
||||
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> tableSize.toString) {
|
||||
sql("SELECT * FROM test").queryExecution.sparkPlan
|
||||
}
|
||||
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (tableSize / 2).toString) {
|
||||
intercept[MaxFileSizeExceedException](
|
||||
sql("SELECT * FROM test").queryExecution.sparkPlan)
|
||||
}
|
||||
|
||||
val nonPartDf = spark.read.format(classOf[ReportStatisticsDataSource].getName).load()
|
||||
nonPartDf.createOrReplaceTempView("test_non_part")
|
||||
val nonPartLogical = nonPartDf.queryExecution.optimizedPlan.collect {
|
||||
case d: DataSourceV2ScanRelation => d
|
||||
}.head
|
||||
val nonPartTableSize = nonPartLogical.computeStats().sizeInBytes.toLong
|
||||
|
||||
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> nonPartTableSize.toString) {
|
||||
sql("SELECT * FROM test_non_part").queryExecution.sparkPlan
|
||||
}
|
||||
|
||||
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (nonPartTableSize / 2).toString) {
|
||||
intercept[MaxFileSizeExceedException](
|
||||
sql("SELECT * FROM test_non_part").queryExecution.sparkPlan)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user