From a3d9ca31d3cbc9ff34484a8ff618e25dbb5b8d3d Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 24 Feb 2022 21:10:46 +0800 Subject: [PATCH] [KYUUBI #1974] Support merge small files in multi insert statement ### _Why are the changes needed?_ This PR aims to support auto merge small files in multi insert statement, for example `FROM VALUES(1) INSERT INTO tmp1 SELECT * INSERT INTO tmp2 SELECT *;` will generate the following plan, `Union` is the root node instead of `InsertIntoHiveTable` ``` Union :- InsertIntoHiveTable : +- Project : +- LocalRelation +- InsertIntoHiveTable +- Project +- LocalRelation ``` This PR also fixed the `canInsertRepartitionByExpression`, previous it did not consider the `SubqueryAlias` which may cause inserting error `Repartition`/`Reblance` node and currupt the data distribution, e.g. `FROM (SELECT * FROM VALUES(1) DOSTRIBUTE BY col1) INSERT INTO tmp1 SELECT * INSERT INTO tmp2 SELECT *;` ``` Union :- InsertIntoHiveTable : +- Project : +- SubqueryAlias : +- RepartitionByExpression : +- Project : +- LocalRelation +- InsertIntoHiveTable +- Project +- SubqueryAlias +- RepartitionByExpression +- Project +- LocalRelation ``` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1974 from pan3793/ext. Closes #1974 56cd7734 [Cheng Pan] nit e0155c27 [Cheng Pan] Support merge small files in multi table insertion Authored-by: Cheng Pan Signed-off-by: Cheng Pan --- .../sql/RepartitionBeforeWritingSuite.scala | 35 +++++++++++++++++-- .../sql/RebalanceBeforeWritingSuite.scala | 28 +++++++++++++-- .../sql/RepartitionBeforeWritingBase.scala | 7 ++++ 3 files changed, 66 insertions(+), 4 deletions(-) diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala index 8b07f543e..f978623af 100644 --- a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala +++ b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala @@ -26,14 +26,14 @@ import org.apache.kyuubi.sql.KyuubiSQLConf class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { test("check repartition exists") { - def check(df: DataFrame): Unit = { + def check(df: DataFrame, expectedRepartitionNum: Int = 1): Unit = { assert( df.queryExecution.analyzed.collect { case r: RepartitionByExpression => assert(r.optNumPartitions === spark.sessionState.conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)) r - }.size == 1) + }.size == expectedRepartitionNum) } // It's better to set config explicitly in case of we change the default value. @@ -45,6 +45,18 @@ class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { "SELECT * FROM VALUES(1),(2) AS t(c1)")) } + withTable("tmp1", "tmp2") { + sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)") + sql(s"CREATE TABLE tmp2 (c1 int) $storage PARTITIONED BY (c2 string)") + check( + sql( + """FROM VALUES(1),(2) AS t(c1) + |INSERT INTO TABLE tmp1 PARTITION(c2='a') SELECT * + |INSERT INTO TABLE tmp2 PARTITION(c2='a') SELECT * + |""".stripMargin), + 2) + } + withTable("tmp1") { sql(s"CREATE TABLE tmp1 (c1 int) $storage") check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)")) @@ -52,6 +64,25 @@ class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { "SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1")) } + withTable("tmp1", "tmp2") { + sql(s"CREATE TABLE tmp1 (c1 int) $storage") + sql(s"CREATE TABLE tmp2 (c1 int) $storage") + check( + sql( + """FROM VALUES(1),(2),(3) + |INSERT INTO TABLE tmp1 SELECT * + |INSERT INTO TABLE tmp2 SELECT * + |""".stripMargin), + 2) + check( + sql( + """FROM (SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1) + |INSERT INTO TABLE tmp1 SELECT * + |INSERT INTO TABLE tmp2 SELECT * + |""".stripMargin), + 2) + } + withTable("tmp1") { sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) AS t(c1)") } diff --git a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala b/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala index f40b43213..f1a27cdb8 100644 --- a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala +++ b/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala @@ -26,11 +26,11 @@ import org.apache.kyuubi.sql.KyuubiSQLConf class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { test("check rebalance exists") { - def check(df: DataFrame): Unit = { + def check(df: DataFrame, expectedRebalanceNum: Int = 1): Unit = { assert( df.queryExecution.analyzed.collect { case r: RebalancePartitions => r - }.size == 1) + }.size == expectedRebalanceNum) } // It's better to set config explicitly in case of we change the default value. @@ -42,11 +42,35 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { "SELECT * FROM VALUES(1),(2) AS t(c1)")) } + withTable("tmp1", "tmp2") { + sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)") + sql(s"CREATE TABLE tmp2 (c1 int) $storage PARTITIONED BY (c2 string)") + check( + sql( + """FROM VALUES(1),(2) + |INSERT INTO TABLE tmp1 PARTITION(c2='a') SELECT * + |INSERT INTO TABLE tmp2 PARTITION(c2='a') SELECT * + |""".stripMargin), + 2) + } + withTable("tmp1") { sql(s"CREATE TABLE tmp1 (c1 int) $storage") check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)")) } + withTable("tmp1", "tmp2") { + sql(s"CREATE TABLE tmp1 (c1 int) $storage") + sql(s"CREATE TABLE tmp2 (c1 int) $storage") + check( + sql( + """FROM VALUES(1),(2),(3) + |INSERT INTO TABLE tmp1 SELECT * + |INSERT INTO TABLE tmp2 SELECT * + |""".stripMargin), + 2) + } + withTable("tmp1") { sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) AS t(c1)") } diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala index b987a72aa..33aff09a4 100644 --- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala +++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala @@ -59,6 +59,9 @@ abstract class RepartitionBeforeWritingDatasourceBase extends RepartitionBuilder query.output.filter(attr => table.partitionColumnNames.contains(attr.name)) c.copy(query = buildRepartition(dynamicPartitionColumns, query)) + case u @ Union(children, _, _) => + u.copy(children = children.map(addRepartition)) + case _ => plan } } @@ -98,6 +101,9 @@ abstract class RepartitionBeforeWritingHiveBase extends RepartitionBuilder { query.output.filter(attr => table.partitionColumnNames.contains(attr.name)) c.copy(query = buildRepartition(dynamicPartitionColumns, query)) + case u @ Union(children, _, _) => + u.copy(children = children.map(addRepartition)) + case _ => plan } } @@ -105,6 +111,7 @@ abstract class RepartitionBeforeWritingHiveBase extends RepartitionBuilder { trait RepartitionBeforeWriteHelper { def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = plan match { case Project(_, child) => canInsertRepartitionByExpression(child) + case SubqueryAlias(_, child) => canInsertRepartitionByExpression(child) case Limit(_, _) => false case _: Sort => false case _: RepartitionByExpression => false