From 60371b5dd58592c66dcfb6883f9080abeeb8c2fa Mon Sep 17 00:00:00 2001 From: tian bao <2011xuesong@gmail.com> Date: Wed, 9 Jul 2025 13:38:51 +0800 Subject: [PATCH] [KYUUBI #7122] Support ORC hive table pushdown filter ### Why are the changes needed? Previously, the `HiveScan` class was used to read data. If it is determined to be ORC type, the `ORCScan` from Spark datasourcev2 can be used. `ORCScan` supports pushfilter down, but `HiveScan` does not yet support it. In our testing, we are able to achieve approximately 2x performance improvement. The conversation can be controlled by setting `spark.sql.kyuubi.hive.connector.read.convertMetastoreOrc`. When enabled, the data source ORC reader is used to process ORC tables created by using the HiveQL syntax, instead of Hive SerDe. close https://github.com/apache/kyuubi/issues/7122 ### How was this patch tested? added unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #7123 from flaming-archer/master_scanbuilder_new. Closes #7122 c3f412f90 [tian bao] add case _ 2be48909f [tian bao] Merge branch 'master_scanbuilder_new' of github.com:flaming-archer/kyuubi into master_scanbuilder_new c825d0f8c [tian bao] review change 8a26d6a8a [tian bao] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala 68d41969f [tian bao] review change bed007fea [tian bao] review change b89e6e67a [tian bao] Optimize UT 5a8941b2d [tian bao] fix failed ut dc1ba47e3 [tian bao] orc pushdown version 0 Authored-by: tian bao <2011xuesong@gmail.com> Signed-off-by: Cheng Pan --- .../spark/connector/hive/HiveTable.scala | 23 ++++- .../hive/KyuubiHiveConnectorConf.scala | 10 ++ .../connector/hive/read/HiveFileIndex.scala | 7 ++ .../connector/hive/HiveCatalogSuite.scala | 29 +++++- .../spark/connector/hive/HiveQuerySuite.scala | 93 +++++++++++++++++++ .../spark/connector/hive/KyuubiHiveTest.scala | 12 +++ .../hive/command/DDLCommandTestUtils.scala | 13 --- 7 files changed, 170 insertions(+), 17 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala index ee6d5fc23..afa2527fd 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.spark.connector.hive import java.util +import java.util.Locale import scala.collection.JavaConverters._ import scala.collection.mutable @@ -31,10 +32,12 @@ import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{BucketSpecHelper, LogicalExpressions} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.READ_CONVERT_METASTORE_ORC import org.apache.kyuubi.spark.connector.hive.read.{HiveCatalogFileIndex, HiveScanBuilder} import org.apache.kyuubi.spark.connector.hive.write.HiveWriteBuilder @@ -59,6 +62,20 @@ case class HiveTable( catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) } + lazy val convertedProvider: Option[String] = { + val serde = catalogTable.storage.serde.getOrElse("").toUpperCase(Locale.ROOT) + val parquet = serde.contains("PARQUET") + val orc = serde.contains("ORC") + val provider = catalogTable.provider.map(_.toUpperCase(Locale.ROOT)) + if (orc || provider.contains("ORC")) { + Some("ORC") + } else if (parquet || provider.contains("PARQUET")) { + Some("PARQUET") + } else { + None + } + } + override def name(): String = catalogTable.identifier.unquotedString override def schema(): StructType = catalogTable.schema @@ -77,7 +94,11 @@ case class HiveTable( } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - HiveScanBuilder(sparkSession, fileIndex, dataSchema, catalogTable) + convertedProvider match { + case Some("ORC") if sparkSession.sessionState.conf.getConf(READ_CONVERT_METASTORE_ORC) => + OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + case _ => HiveScanBuilder(sparkSession, fileIndex, dataSchema, catalogTable) + } } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala index cc5ffde9c..075f12060 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala @@ -19,6 +19,8 @@ package org.apache.kyuubi.spark.connector.hive import java.util.Locale +import org.apache.spark.sql.internal.SQLConf.buildConf + object KyuubiHiveConnectorConf { import org.apache.spark.sql.internal.SQLConf.buildStaticConf @@ -39,4 +41,12 @@ object KyuubiHiveConnectorConf { "Invalid value for 'spark.sql.kyuubi.hive.connector.externalCatalog.share.policy'." + "Valid values are 'ONE_FOR_ONE', 'ONE_FOR_ALL'.") .createWithDefault(OneForAllPolicy.name) + + val READ_CONVERT_METASTORE_ORC = + buildConf("spark.sql.kyuubi.hive.connector.read.convertMetastoreOrc") + .doc("When enabled, the data source ORC reader is used to process " + + "ORC tables created by using the HiveQL syntax, instead of Hive SerDe.") + .version("1.11.0") + .booleanConf + .createWithDefault(true) } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala index 55c9168ed..0142c5561 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala @@ -54,6 +54,13 @@ class HiveCatalogFileIndex( override def partitionSchema: StructType = table.partitionSchema + override def listFiles( + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + val fileIndex = filterPartitions(partitionFilters) + fileIndex.listFiles(partitionFilters, dataFilters) + } + private[hive] def listHiveFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]) : (Seq[PartitionDirectory], Map[PartitionDirectory, CatalogTablePartition]) = { val fileIndex = filterPartitions(partitionFilters) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala index 0485087bf..c083d7550 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala @@ -31,11 +31,13 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchT import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._ import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper +import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.READ_CONVERT_METASTORE_ORC import org.apache.kyuubi.spark.connector.hive.read.HiveScan class HiveCatalogSuite extends KyuubiHiveTest { @@ -355,8 +357,29 @@ class HiveCatalogSuite extends KyuubiHiveTest { val orcProps: util.Map[String, String] = new util.HashMap[String, String]() orcProps.put(TableCatalog.PROP_PROVIDER, "orc") val ot = catalog.createTable(orc_table, schema, Array.empty[Transform], orcProps) - val orcScan = ot.asInstanceOf[HiveTable] - .newScanBuilder(CaseInsensitiveStringMap.empty()).build().asInstanceOf[HiveScan] - assert(orcScan.isSplitable(new Path("empty"))) + + Seq("true", "false").foreach { value => + withSparkSession(Map(READ_CONVERT_METASTORE_ORC.key -> value)) { _ => + val scan = ot.asInstanceOf[HiveTable] + .newScanBuilder(CaseInsensitiveStringMap.empty()).build() + + val orcScan = value match { + case "true" => + assert( + scan.isInstanceOf[OrcScan], + s"Expected OrcScan, got ${scan.getClass.getSimpleName}") + scan.asInstanceOf[OrcScan] + case "false" => + assert( + scan.isInstanceOf[HiveScan], + s"Expected HiveScan, got ${scan.getClass.getSimpleName}") + scan.asInstanceOf[HiveScan] + case _ => + throw new IllegalArgumentException( + s"Unexpected value: '$value'. Only 'true' or 'false' are allowed.") + } + assert(orcScan.isSplitable(new Path("empty"))) + } + } } } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala index 65ad81d95..35b2f5476 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala @@ -260,6 +260,99 @@ class HiveQuerySuite extends KyuubiHiveTest { } } + test("ORC filter pushdown") { + val table = "hive.default.orc_filter_pushdown" + withTable(table) { + spark.sql( + s""" + | CREATE TABLE $table ( + | id INT, + | data STRING, + | value INT + | ) PARTITIONED BY (dt STRING, region STRING) + | STORED AS ORC + | """.stripMargin).collect() + + // Insert test data with partitions + spark.sql( + s""" + | INSERT INTO $table PARTITION (dt='2024-01-01', region='east') + | VALUES (1, 'a', 100), (2, 'b', 200), (11, 'aa', 100), (22, 'b', 200) + |""".stripMargin) + + spark.sql( + s""" + | INSERT INTO $table PARTITION (dt='2024-01-01', region='west') + | VALUES (3, 'c', 300), (4, 'd', 400), (33, 'cc', 300), (44, 'dd', 400) + |""".stripMargin) + spark.sql( + s""" + | INSERT INTO $table PARTITION (dt='2024-01-02', region='east') + | VALUES (5, 'e', 500), (6, 'f', 600), (55, 'ee', 500), (66, 'ff', 600) + | """.stripMargin) + + // Test multiple partition filters + val df1 = spark.sql( + s""" + | SELECT * FROM $table + | WHERE dt = '2024-01-01' AND region = 'east' AND value > 1500 + |""".stripMargin) + assert(df1.count() === 0) + + // Test multiple partition filters + val df2 = spark.sql( + s""" + | SELECT * FROM $table + | WHERE dt = '2024-01-01' AND region = 'east' AND value > 150 + |""".stripMargin) + assert(df2.count() === 2) + assert(df2.collect().map(_.getInt(0)).toSet === Set(2, 22)) + + // Test explain + val df3 = spark.sql( + s""" + | EXPLAIN SELECT count(*) as total_rows + | FROM $table + | WHERE dt = '2024-01-01' AND region = 'east' AND value > 1 + |""".stripMargin) + assert(df3.count() === 1) + // contains like : PushedFilters: [IsNotNull(value), GreaterThan(value,1)] + assert(df3.collect().map(_.getString(0)).filter { s => + s.contains("PushedFilters") && !s.contains("PushedFilters: []") + }.toSet.size == 1) + + // Test aggregation pushdown partition filters + spark.conf.set("spark.sql.orc.aggregatePushdown", true) + + // Test aggregation pushdown partition filters + val df4 = spark.sql( + s""" + | SELECT count(*) as total_rows + | FROM $table + | WHERE dt = '2024-01-01' AND region = 'east' + | group by dt, region + | """.stripMargin) + assert(df4.count() === 1) + assert(df4.collect().map(_.getLong(0)).toSet === Set(4L)) + + val df5 = spark.sql( + s""" + | EXPLAIN SELECT count(*) as total_rows + | FROM $table + | WHERE dt = '2024-01-01' AND region = 'east' + | group by dt, region + | """.stripMargin) + assert(df5.count() === 1) + // contains like : PushedAggregation: [COUNT(*)], + assert(df5.collect().map(_.getString(0)).filter { s => + s.contains("PushedAggregation") && !s.contains("PushedAggregation: []") + }.toSet.size == 1) + + spark.conf.set("spark.sql.orc.aggregatePushdown", false) + + } + } + private def readPartitionedTable(format: String, hiveTable: Boolean): Unit = { withSparkSession() { spark => val table = "hive.default.employee" diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala index 851659b15..fb5bcd621 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala @@ -21,6 +21,7 @@ import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.{QueryTest, SparkSession} import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog} +import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.Utils import org.apache.kyuubi.spark.connector.common.LocalSparkSession @@ -77,5 +78,16 @@ abstract class KyuubiHiveTest extends QueryTest with Logging { f(innerSpark) } + /** + * Drops table `tableName` after calling `f`. + */ + protected def withTable(tableNames: String*)(f: => Unit): Unit = { + Utils.tryWithSafeFinally(f) { + tableNames.foreach { name => + spark.sql(s"DROP TABLE IF EXISTS $name") + } + } + } + override def spark: SparkSession = innerSpark } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala index 83fd95b6b..2a5602554 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala @@ -78,19 +78,6 @@ trait DDLCommandTestUtils extends KyuubiHiveTest { fs.makeQualified(hadoopPath).toUri } - /** - * Drops table `tableName` after calling `f`. - */ - protected def withTable(tableNames: String*)(f: => Unit): Unit = { - try { - f - } finally { - tableNames.foreach { name => - spark.sql(s"DROP TABLE IF EXISTS $name") - } - } - } - protected def withNamespaceAndTable( ns: String, tableName: String,