[KYUUBI #7122] Support ORC hive table pushdown filter

### Why are the changes needed?

Previously, the `HiveScan` class was used to read data. If it is determined to be ORC type, the `ORCScan` from Spark datasourcev2 can be used. `ORCScan` supports pushfilter down, but `HiveScan` does not yet support it.

In our testing, we are able to achieve approximately 2x performance improvement.

The conversation can be controlled by setting `spark.sql.kyuubi.hive.connector.read.convertMetastoreOrc`. When enabled, the data source ORC reader is used to process ORC tables created by using the HiveQL syntax, instead of Hive SerDe.

close https://github.com/apache/kyuubi/issues/7122

### How was this patch tested?

added unit test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #7123 from flaming-archer/master_scanbuilder_new.

Closes #7122

c3f412f90 [tian bao] add case _
2be48909f [tian bao] Merge branch 'master_scanbuilder_new' of github.com:flaming-archer/kyuubi into master_scanbuilder_new
c825d0f8c [tian bao] review change
8a26d6a8a [tian bao] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
68d41969f [tian bao] review change
bed007fea [tian bao] review change
b89e6e67a [tian bao] Optimize UT
5a8941b2d [tian bao] fix failed ut
dc1ba47e3 [tian bao] orc pushdown version 0

Authored-by: tian bao <2011xuesong@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
tian bao 2025-07-09 13:38:51 +08:00 committed by Cheng Pan
parent 84928184fc
commit 60371b5dd5
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
7 changed files with 170 additions and 17 deletions

View File

@ -18,6 +18,7 @@
package org.apache.kyuubi.spark.connector.hive
import java.util
import java.util.Locale
import scala.collection.JavaConverters._
import scala.collection.mutable
@ -31,10 +32,12 @@ import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{BucketSpecHelper, LogicalExpressions}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.READ_CONVERT_METASTORE_ORC
import org.apache.kyuubi.spark.connector.hive.read.{HiveCatalogFileIndex, HiveScanBuilder}
import org.apache.kyuubi.spark.connector.hive.write.HiveWriteBuilder
@ -59,6 +62,20 @@ case class HiveTable(
catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
}
lazy val convertedProvider: Option[String] = {
val serde = catalogTable.storage.serde.getOrElse("").toUpperCase(Locale.ROOT)
val parquet = serde.contains("PARQUET")
val orc = serde.contains("ORC")
val provider = catalogTable.provider.map(_.toUpperCase(Locale.ROOT))
if (orc || provider.contains("ORC")) {
Some("ORC")
} else if (parquet || provider.contains("PARQUET")) {
Some("PARQUET")
} else {
None
}
}
override def name(): String = catalogTable.identifier.unquotedString
override def schema(): StructType = catalogTable.schema
@ -77,7 +94,11 @@ case class HiveTable(
}
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
HiveScanBuilder(sparkSession, fileIndex, dataSchema, catalogTable)
convertedProvider match {
case Some("ORC") if sparkSession.sessionState.conf.getConf(READ_CONVERT_METASTORE_ORC) =>
OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
case _ => HiveScanBuilder(sparkSession, fileIndex, dataSchema, catalogTable)
}
}
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {

View File

@ -19,6 +19,8 @@ package org.apache.kyuubi.spark.connector.hive
import java.util.Locale
import org.apache.spark.sql.internal.SQLConf.buildConf
object KyuubiHiveConnectorConf {
import org.apache.spark.sql.internal.SQLConf.buildStaticConf
@ -39,4 +41,12 @@ object KyuubiHiveConnectorConf {
"Invalid value for 'spark.sql.kyuubi.hive.connector.externalCatalog.share.policy'." +
"Valid values are 'ONE_FOR_ONE', 'ONE_FOR_ALL'.")
.createWithDefault(OneForAllPolicy.name)
val READ_CONVERT_METASTORE_ORC =
buildConf("spark.sql.kyuubi.hive.connector.read.convertMetastoreOrc")
.doc("When enabled, the data source ORC reader is used to process " +
"ORC tables created by using the HiveQL syntax, instead of Hive SerDe.")
.version("1.11.0")
.booleanConf
.createWithDefault(true)
}

View File

@ -54,6 +54,13 @@ class HiveCatalogFileIndex(
override def partitionSchema: StructType = table.partitionSchema
override def listFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
val fileIndex = filterPartitions(partitionFilters)
fileIndex.listFiles(partitionFilters, dataFilters)
}
private[hive] def listHiveFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression])
: (Seq[PartitionDirectory], Map[PartitionDirectory, CatalogTablePartition]) = {
val fileIndex = filterPartitions(partitionFilters)

View File

@ -31,11 +31,13 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchT
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.READ_CONVERT_METASTORE_ORC
import org.apache.kyuubi.spark.connector.hive.read.HiveScan
class HiveCatalogSuite extends KyuubiHiveTest {
@ -355,8 +357,29 @@ class HiveCatalogSuite extends KyuubiHiveTest {
val orcProps: util.Map[String, String] = new util.HashMap[String, String]()
orcProps.put(TableCatalog.PROP_PROVIDER, "orc")
val ot = catalog.createTable(orc_table, schema, Array.empty[Transform], orcProps)
val orcScan = ot.asInstanceOf[HiveTable]
.newScanBuilder(CaseInsensitiveStringMap.empty()).build().asInstanceOf[HiveScan]
assert(orcScan.isSplitable(new Path("empty")))
Seq("true", "false").foreach { value =>
withSparkSession(Map(READ_CONVERT_METASTORE_ORC.key -> value)) { _ =>
val scan = ot.asInstanceOf[HiveTable]
.newScanBuilder(CaseInsensitiveStringMap.empty()).build()
val orcScan = value match {
case "true" =>
assert(
scan.isInstanceOf[OrcScan],
s"Expected OrcScan, got ${scan.getClass.getSimpleName}")
scan.asInstanceOf[OrcScan]
case "false" =>
assert(
scan.isInstanceOf[HiveScan],
s"Expected HiveScan, got ${scan.getClass.getSimpleName}")
scan.asInstanceOf[HiveScan]
case _ =>
throw new IllegalArgumentException(
s"Unexpected value: '$value'. Only 'true' or 'false' are allowed.")
}
assert(orcScan.isSplitable(new Path("empty")))
}
}
}
}

View File

@ -260,6 +260,99 @@ class HiveQuerySuite extends KyuubiHiveTest {
}
}
test("ORC filter pushdown") {
val table = "hive.default.orc_filter_pushdown"
withTable(table) {
spark.sql(
s"""
| CREATE TABLE $table (
| id INT,
| data STRING,
| value INT
| ) PARTITIONED BY (dt STRING, region STRING)
| STORED AS ORC
| """.stripMargin).collect()
// Insert test data with partitions
spark.sql(
s"""
| INSERT INTO $table PARTITION (dt='2024-01-01', region='east')
| VALUES (1, 'a', 100), (2, 'b', 200), (11, 'aa', 100), (22, 'b', 200)
|""".stripMargin)
spark.sql(
s"""
| INSERT INTO $table PARTITION (dt='2024-01-01', region='west')
| VALUES (3, 'c', 300), (4, 'd', 400), (33, 'cc', 300), (44, 'dd', 400)
|""".stripMargin)
spark.sql(
s"""
| INSERT INTO $table PARTITION (dt='2024-01-02', region='east')
| VALUES (5, 'e', 500), (6, 'f', 600), (55, 'ee', 500), (66, 'ff', 600)
| """.stripMargin)
// Test multiple partition filters
val df1 = spark.sql(
s"""
| SELECT * FROM $table
| WHERE dt = '2024-01-01' AND region = 'east' AND value > 1500
|""".stripMargin)
assert(df1.count() === 0)
// Test multiple partition filters
val df2 = spark.sql(
s"""
| SELECT * FROM $table
| WHERE dt = '2024-01-01' AND region = 'east' AND value > 150
|""".stripMargin)
assert(df2.count() === 2)
assert(df2.collect().map(_.getInt(0)).toSet === Set(2, 22))
// Test explain
val df3 = spark.sql(
s"""
| EXPLAIN SELECT count(*) as total_rows
| FROM $table
| WHERE dt = '2024-01-01' AND region = 'east' AND value > 1
|""".stripMargin)
assert(df3.count() === 1)
// contains like : PushedFilters: [IsNotNull(value), GreaterThan(value,1)]
assert(df3.collect().map(_.getString(0)).filter { s =>
s.contains("PushedFilters") && !s.contains("PushedFilters: []")
}.toSet.size == 1)
// Test aggregation pushdown partition filters
spark.conf.set("spark.sql.orc.aggregatePushdown", true)
// Test aggregation pushdown partition filters
val df4 = spark.sql(
s"""
| SELECT count(*) as total_rows
| FROM $table
| WHERE dt = '2024-01-01' AND region = 'east'
| group by dt, region
| """.stripMargin)
assert(df4.count() === 1)
assert(df4.collect().map(_.getLong(0)).toSet === Set(4L))
val df5 = spark.sql(
s"""
| EXPLAIN SELECT count(*) as total_rows
| FROM $table
| WHERE dt = '2024-01-01' AND region = 'east'
| group by dt, region
| """.stripMargin)
assert(df5.count() === 1)
// contains like : PushedAggregation: [COUNT(*)],
assert(df5.collect().map(_.getString(0)).filter { s =>
s.contains("PushedAggregation") && !s.contains("PushedAggregation: []")
}.toSet.size == 1)
spark.conf.set("spark.sql.orc.aggregatePushdown", false)
}
}
private def readPartitionedTable(format: String, hiveTable: Boolean): Unit = {
withSparkSession() { spark =>
val table = "hive.default.employee"

View File

@ -21,6 +21,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, SparkSession}
import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog}
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.Utils
import org.apache.kyuubi.spark.connector.common.LocalSparkSession
@ -77,5 +78,16 @@ abstract class KyuubiHiveTest extends QueryTest with Logging {
f(innerSpark)
}
/**
* Drops table `tableName` after calling `f`.
*/
protected def withTable(tableNames: String*)(f: => Unit): Unit = {
Utils.tryWithSafeFinally(f) {
tableNames.foreach { name =>
spark.sql(s"DROP TABLE IF EXISTS $name")
}
}
}
override def spark: SparkSession = innerSpark
}

View File

@ -78,19 +78,6 @@ trait DDLCommandTestUtils extends KyuubiHiveTest {
fs.makeQualified(hadoopPath).toUri
}
/**
* Drops table `tableName` after calling `f`.
*/
protected def withTable(tableNames: String*)(f: => Unit): Unit = {
try {
f
} finally {
tableNames.foreach { name =>
spark.sql(s"DROP TABLE IF EXISTS $name")
}
}
}
protected def withNamespaceAndTable(
ns: String,
tableName: String,