[KYUUBI #6990] Add rebalance before InsertIntoHiveDirCommand and InsertIntoDataSourceDirCommand to align with behaviors of hive

### Why are the changes needed?

When users switch from Hive to Spark, for sql like INSERT OVERWRITE DIRECTORY AS SELECT, it would be great if small files could be automatically merged through simple configuration, just like in Hive.

### How was this patch tested?

UnitTest

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #6991 from Z1Wu/feat/add_insert_dir_rebalance_support.

Closes #6990

2820bb2d2 [wuziyi] [fix] nit
a69c04191 [wuziyi] [fix] nit
951a7738f [wuziyi] [fix] nit
f75dfcb3a [wuziyi] [Feat] add rebalance before InsertIntoHiveDirCommand and InsertIntoDataSourceDirCommand to align with behaviors of hive

Authored-by: wuziyi <wuziyi02@corp.netease.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
wuziyi 2025-03-25 00:52:55 +08:00 committed by Cheng Pan
parent 338206e8a7
commit 2080c2186c
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
6 changed files with 150 additions and 9 deletions

View File

@ -20,9 +20,9 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, InsertIntoDataSourceDirCommand}
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
import org.apache.spark.sql.internal.StaticSQLConf
trait RepartitionBuilder extends Rule[LogicalPlan] with RepartitionBeforeWriteHelper {
@ -59,6 +59,10 @@ abstract class RepartitionBeforeWritingDatasourceBase extends RepartitionBuilder
query.output.filter(attr => table.partitionColumnNames.contains(attr.name))
c.copy(query = buildRepartition(dynamicPartitionColumns, query))
case i @ InsertIntoDataSourceDirCommand(_, _, query, _)
if query.resolved && canInsertRepartitionByExpression(query) =>
i.copy(query = buildRepartition(Seq.empty, query))
case u @ Union(children, _, _) =>
u.copy(children = children.map(addRepartition))
@ -101,6 +105,10 @@ abstract class RepartitionBeforeWritingHiveBase extends RepartitionBuilder {
query.output.filter(attr => table.partitionColumnNames.contains(attr.name))
c.copy(query = buildRepartition(dynamicPartitionColumns, query))
case c @ InsertIntoHiveDirCommand(_, _, query, _, _)
if query.resolved && canInsertRepartitionByExpression(query) =>
c.copy(query = buildRepartition(Seq.empty, query))
case u @ Union(children, _, _) =>
u.copy(children = children.map(addRepartition))

View File

@ -19,9 +19,9 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RebalancePartitions, Sort}
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.command.{DataWritingCommand, InsertIntoDataSourceDirCommand}
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand
import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, OptimizedCreateHiveTableAsSelectCommand}
import org.apache.kyuubi.sql.KyuubiSQLConf
@ -272,4 +272,42 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
}
}
}
test("Test rebalance in InsertIntoHiveDirCommand") {
withSQLConf(
HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
HiveUtils.CONVERT_METASTORE_CTAS.key -> "false",
KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") {
withTempDir(tmpDir => {
spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
s"STORED AS PARQUET SELECT * FROM tmp_table")
val insertHiveDirCommand = df.queryExecution.analyzed.collect {
case _: InsertIntoHiveDirCommand => true
}
assert(insertHiveDirCommand.size == 1)
val repartition = df.queryExecution.analyzed.collect {
case _: RebalancePartitions => true
}
assert(repartition.size == 1)
})
}
}
test("Test rebalance in InsertIntoDataSourceDirCommand") {
withSQLConf(
KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") {
withTempDir(tmpDir => {
spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
s"USING PARQUET SELECT * FROM tmp_table")
assert(df.queryExecution.analyzed.isInstanceOf[InsertIntoDataSourceDirCommand])
val repartition =
df.queryExecution.analyzed.asInstanceOf[InsertIntoDataSourceDirCommand].query.collect {
case _: RebalancePartitions => true
}
assert(repartition.size == 1)
})
}
}
}

View File

@ -20,8 +20,9 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, InsertIntoHiveTable}
import org.apache.spark.sql.internal.StaticSQLConf
trait RepartitionBuilder extends Rule[LogicalPlan] with RepartitionBeforeWriteHelper {
@ -52,6 +53,10 @@ abstract class RepartitionBeforeWritingDatasourceBase extends RepartitionBuilder
val dynamicPartitionColumns = pc.filterNot(attr => sp.contains(attr.name))
i.copy(query = buildRepartition(dynamicPartitionColumns, query))
case i @ InsertIntoDataSourceDirCommand(_, _, query, _)
if query.resolved && canInsertRepartitionByExpression(query) =>
i.copy(query = buildRepartition(Seq.empty, query))
case u @ Union(children, _, _) =>
u.copy(children = children.map(addRepartition))
@ -82,6 +87,10 @@ abstract class RepartitionBeforeWritingHiveBase extends RepartitionBuilder {
.flatMap(name => query.output.find(_.name == name)).toSeq
i.copy(query = buildRepartition(dynamicPartitionColumns, query))
case i @ InsertIntoHiveDirCommand(_, _, query, _, _)
if query.resolved && canInsertRepartitionByExpression(query) =>
i.copy(query = buildRepartition(Seq.empty, query))
case u @ Union(children, _, _) =>
u.copy(children = children.map(addRepartition))

View File

@ -19,10 +19,10 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RebalancePartitions, Sort}
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.command.{DataWritingCommand, InsertIntoDataSourceDirCommand}
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, InsertIntoHiveTable}
import org.apache.kyuubi.sql.KyuubiSQLConf
@ -267,4 +267,42 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
}
}
}
test("Test rebalance in InsertIntoHiveDirCommand") {
withSQLConf(
HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
HiveUtils.CONVERT_METASTORE_CTAS.key -> "false",
KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") {
withTempDir(tmpDir => {
spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
s"STORED AS PARQUET SELECT * FROM tmp_table")
val insertHiveDirCommand = df.queryExecution.analyzed.collect {
case _: InsertIntoHiveDirCommand => true
}
assert(insertHiveDirCommand.size == 1)
val repartition = df.queryExecution.analyzed.collect {
case _: RebalancePartitions => true
}
assert(repartition.size == 1)
})
}
}
test("Test rebalance in InsertIntoDataSourceDirCommand") {
withSQLConf(
KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") {
withTempDir(tmpDir => {
spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
s"USING PARQUET SELECT * FROM tmp_table")
assert(df.queryExecution.analyzed.isInstanceOf[InsertIntoDataSourceDirCommand])
val repartition =
df.queryExecution.analyzed.asInstanceOf[InsertIntoDataSourceDirCommand].query.collect {
case _: RebalancePartitions => true
}
assert(repartition.size == 1)
})
}
}
}

View File

@ -23,8 +23,9 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, InsertIntoHiveTable}
import org.apache.spark.sql.internal.StaticSQLConf
trait RebalanceBeforeWritingBase extends Rule[LogicalPlan] {
@ -112,6 +113,10 @@ case class RebalanceBeforeWritingDatasource(session: SparkSession)
val dynamicPartitionColumns = pc.filterNot(attr => sp.contains(attr.name))
i.copy(query = buildRebalance(dynamicPartitionColumns, query))
case i @ InsertIntoDataSourceDirCommand(_, _, query, _)
if query.resolved && canInsertRebalance(query) =>
i.copy(query = buildRebalance(Seq.empty, query))
case u @ Union(children, _, _) =>
u.copy(children = children.map(addRebalance))
@ -144,6 +149,10 @@ case class RebalanceBeforeWritingHive(session: SparkSession)
.flatMap(name => query.output.find(_.name == name)).toSeq
i.copy(query = buildRebalance(dynamicPartitionColumns, query))
case i @ InsertIntoHiveDirCommand(_, _, query, _, _)
if query.resolved && canInsertRebalance(query) =>
i.copy(query = buildRebalance(Seq.empty, query))
case u @ Union(children, _, _) =>
u.copy(children = children.map(addRebalance))

View File

@ -20,9 +20,10 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RebalancePartitions, Sort}
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, InsertIntoHiveTable}
import org.apache.kyuubi.sql.KyuubiSQLConf
@ -292,4 +293,42 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
}
}
}
test("Test rebalance in InsertIntoHiveDirCommand") {
withSQLConf(
HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
HiveUtils.CONVERT_METASTORE_CTAS.key -> "false",
KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") {
withTempDir(tmpDir => {
spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
s"STORED AS PARQUET SELECT * FROM tmp_table")
val insertHiveDirCommand = df.queryExecution.analyzed.collect {
case _: InsertIntoHiveDirCommand => true
}
assert(insertHiveDirCommand.size == 1)
val repartition = df.queryExecution.analyzed.collect {
case _: RebalancePartitions => true
}
assert(repartition.size == 1)
})
}
}
test("Test rebalance in InsertIntoDataSourceDirCommand") {
withSQLConf(
KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") {
withTempDir(tmpDir => {
spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
s"USING PARQUET SELECT * FROM tmp_table")
assert(df.queryExecution.analyzed.isInstanceOf[InsertIntoDataSourceDirCommand])
val repartition =
df.queryExecution.analyzed.asInstanceOf[InsertIntoDataSourceDirCommand].query.collect {
case _: RebalancePartitions => true
}
assert(repartition.size == 1)
})
}
}
}