[KYUUBI #1974] Support merge small files in multi insert statement

### _Why are the changes needed?_

This PR aims to support auto merge small files in multi insert statement, for example

`FROM VALUES(1) INSERT INTO tmp1 SELECT * INSERT INTO tmp2 SELECT *;`

will generate the following plan, `Union` is the root node instead of `InsertIntoHiveTable`

```
Union
:- InsertIntoHiveTable
:  +- Project
:    +- LocalRelation
+- InsertIntoHiveTable
   +- Project
     +- LocalRelation
```

This PR also fixed the `canInsertRepartitionByExpression`, previous it did not consider the `SubqueryAlias` which may cause inserting error `Repartition`/`Reblance` node and currupt the data distribution, e.g.

`FROM (SELECT * FROM VALUES(1) DOSTRIBUTE BY col1) INSERT INTO tmp1 SELECT * INSERT INTO tmp2 SELECT *;`

```
Union
:- InsertIntoHiveTable
:  +- Project
:     +- SubqueryAlias
:        +- RepartitionByExpression
:           +- Project
:              +- LocalRelation
+- InsertIntoHiveTable
   +- Project
      +- SubqueryAlias
         +- RepartitionByExpression
            +- Project
               +- LocalRelation
```

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1974 from pan3793/ext.

Closes #1974

56cd7734 [Cheng Pan] nit
e0155c27 [Cheng Pan] Support merge small files in multi table insertion

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2022-02-24 21:10:46 +08:00
parent 6d757a3df9
commit a3d9ca31d3
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
3 changed files with 66 additions and 4 deletions

View File

@ -26,14 +26,14 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
test("check repartition exists") {
def check(df: DataFrame): Unit = {
def check(df: DataFrame, expectedRepartitionNum: Int = 1): Unit = {
assert(
df.queryExecution.analyzed.collect {
case r: RepartitionByExpression =>
assert(r.optNumPartitions ===
spark.sessionState.conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))
r
}.size == 1)
}.size == expectedRepartitionNum)
}
// It's better to set config explicitly in case of we change the default value.
@ -45,6 +45,18 @@ class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
"SELECT * FROM VALUES(1),(2) AS t(c1)"))
}
withTable("tmp1", "tmp2") {
sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)")
sql(s"CREATE TABLE tmp2 (c1 int) $storage PARTITIONED BY (c2 string)")
check(
sql(
"""FROM VALUES(1),(2) AS t(c1)
|INSERT INTO TABLE tmp1 PARTITION(c2='a') SELECT *
|INSERT INTO TABLE tmp2 PARTITION(c2='a') SELECT *
|""".stripMargin),
2)
}
withTable("tmp1") {
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)"))
@ -52,6 +64,25 @@ class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
"SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1"))
}
withTable("tmp1", "tmp2") {
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
sql(s"CREATE TABLE tmp2 (c1 int) $storage")
check(
sql(
"""FROM VALUES(1),(2),(3)
|INSERT INTO TABLE tmp1 SELECT *
|INSERT INTO TABLE tmp2 SELECT *
|""".stripMargin),
2)
check(
sql(
"""FROM (SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1)
|INSERT INTO TABLE tmp1 SELECT *
|INSERT INTO TABLE tmp2 SELECT *
|""".stripMargin),
2)
}
withTable("tmp1") {
sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) AS t(c1)")
}

View File

@ -26,11 +26,11 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
test("check rebalance exists") {
def check(df: DataFrame): Unit = {
def check(df: DataFrame, expectedRebalanceNum: Int = 1): Unit = {
assert(
df.queryExecution.analyzed.collect {
case r: RebalancePartitions => r
}.size == 1)
}.size == expectedRebalanceNum)
}
// It's better to set config explicitly in case of we change the default value.
@ -42,11 +42,35 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
"SELECT * FROM VALUES(1),(2) AS t(c1)"))
}
withTable("tmp1", "tmp2") {
sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)")
sql(s"CREATE TABLE tmp2 (c1 int) $storage PARTITIONED BY (c2 string)")
check(
sql(
"""FROM VALUES(1),(2)
|INSERT INTO TABLE tmp1 PARTITION(c2='a') SELECT *
|INSERT INTO TABLE tmp2 PARTITION(c2='a') SELECT *
|""".stripMargin),
2)
}
withTable("tmp1") {
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)"))
}
withTable("tmp1", "tmp2") {
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
sql(s"CREATE TABLE tmp2 (c1 int) $storage")
check(
sql(
"""FROM VALUES(1),(2),(3)
|INSERT INTO TABLE tmp1 SELECT *
|INSERT INTO TABLE tmp2 SELECT *
|""".stripMargin),
2)
}
withTable("tmp1") {
sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) AS t(c1)")
}

View File

@ -59,6 +59,9 @@ abstract class RepartitionBeforeWritingDatasourceBase extends RepartitionBuilder
query.output.filter(attr => table.partitionColumnNames.contains(attr.name))
c.copy(query = buildRepartition(dynamicPartitionColumns, query))
case u @ Union(children, _, _) =>
u.copy(children = children.map(addRepartition))
case _ => plan
}
}
@ -98,6 +101,9 @@ abstract class RepartitionBeforeWritingHiveBase extends RepartitionBuilder {
query.output.filter(attr => table.partitionColumnNames.contains(attr.name))
c.copy(query = buildRepartition(dynamicPartitionColumns, query))
case u @ Union(children, _, _) =>
u.copy(children = children.map(addRepartition))
case _ => plan
}
}
@ -105,6 +111,7 @@ abstract class RepartitionBeforeWritingHiveBase extends RepartitionBuilder {
trait RepartitionBeforeWriteHelper {
def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = plan match {
case Project(_, child) => canInsertRepartitionByExpression(child)
case SubqueryAlias(_, child) => canInsertRepartitionByExpression(child)
case Limit(_, _) => false
case _: Sort => false
case _: RepartitionByExpression => false