[KYUUBI #1577] Add DropIgnoreNonexistent Rule.

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

Like the `hive.exec.drop.ignorenonexistent` configuration in hive, do not report an error if DROP DATABASE/TABLE/VIEW/Function/Partition specifies a non-existent database/table/view/function/partition. For details: #1577.

### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [X] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1583 from wForget/KYUUBI-1577.

Closes #1577

63382660 [Wang Zhen] delete useless comment
adadf424 [Wang Zhen] add DropIgnoreNonexistent rule using injectPostHocResolutionRule
860b15d3 [Wang Zhen] add DropIgnoreNonexistent rule for spark 3.2
aad43cdb [Wang Zhen] adjust code
ca1b9f11 [Wang Zhen] make some adjustments and update rules.md
804bf40e [Wang Zhen] [KYUUBI-1577] Add DropIgnoreNonexistent Rule.

Authored-by: Wang Zhen <wangzhen07@qiyi.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
Wang Zhen 2021-12-20 11:46:25 +08:00 committed by ulysses-you
parent f3dc1fdecd
commit df1d9f3bb2
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
8 changed files with 224 additions and 0 deletions

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.UnresolvedTableOrView
import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopDropTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableDropPartitionCommand, DropDatabaseCommand, DropFunctionCommand, DropTableCommand}
import org.apache.kyuubi.sql.KyuubiSQLConf._
case class DropIgnoreNonexistent(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
if (conf.getConf(DROP_IGNORE_NONEXISTENT)) {
plan match {
case i @ AlterTableDropPartitionCommand(_, _, false, _, _) =>
i.copy(ifExists = true)
case i @ DropTableCommand(_, false, _, _) =>
i.copy(ifExists = true)
case i @ DropDatabaseCommand(_, false, _) =>
i.copy(ifExists = true)
case i @ DropFunctionCommand(_, _, false, _) =>
i.copy(ifExists = true)
// like: org.apache.spark.sql.catalyst.analysis.ResolveNoopDropTable
case DropTable(u: UnresolvedTableOrView, false, _) =>
NoopDropTable(u.multipartIdentifier)
case _ => plan
}
} else {
plan
}
}
}

View File

@ -39,6 +39,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
extensions.injectPostHocResolutionRule(RepartitionBeforeWritingDatasource)
extensions.injectPostHocResolutionRule(RepartitionBeforeWritingHive)
extensions.injectPostHocResolutionRule(ForcedMaxOutputRowsRule)
extensions.injectPostHocResolutionRule(DropIgnoreNonexistent)
extensions.injectPlannerStrategy(MaxPartitionStrategy)
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.plans.logical.NoopDropTable
import org.apache.spark.sql.execution.command._
import org.apache.kyuubi.sql.KyuubiSQLConf
class DropIgnoreNonexistentSuite extends KyuubiSparkSQLExtensionTest {
test("drop ignore nonexistent") {
withSQLConf(KyuubiSQLConf.DROP_IGNORE_NONEXISTENT.key -> "true") {
// drop nonexistent database
val df1 = sql("DROP DATABASE nonexistent_database")
assert(df1.queryExecution.analyzed.asInstanceOf[DropDatabaseCommand].ifExists == true)
// drop nonexistent table
// related org.apache.spark.sql.catalyst.analysis.ResolveNoopDropTable
val df2 = sql("DROP TABLE nonexistent_table")
assert(df2.queryExecution.analyzed.isInstanceOf[NoopDropTable])
// drop nonexistent view
val df3 = sql("DROP VIEW nonexistent_view")
assert(df3.queryExecution.analyzed.asInstanceOf[DropTableCommand].ifExists == true)
// drop nonexistent function
val df4 = sql("DROP FUNCTION nonexistent_function")
assert(df4.queryExecution.analyzed.asInstanceOf[DropFunctionCommand].ifExists == true)
// drop nonexistent PARTITION
withTable("test") {
sql("CREATE TABLE IF NOT EXISTS test(i int) PARTITIONED BY (p int)")
val df5 = sql("ALTER TABLE test DROP PARTITION (p = 1)")
assert(df5.queryExecution.analyzed
.asInstanceOf[AlterTableDropPartitionCommand].ifExists == true)
}
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, UnresolvedTableOrView, UnresolvedView}
import org.apache.spark.sql.catalyst.plans.logical.{DropTable, DropView, LogicalPlan, NoopCommand, UncacheTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableDropPartitionCommand, DropDatabaseCommand, DropFunctionCommand, DropTableCommand}
import org.apache.kyuubi.sql.KyuubiSQLConf._
case class DropIgnoreNonexistent(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
if (conf.getConf(DROP_IGNORE_NONEXISTENT)) {
plan match {
case i @ AlterTableDropPartitionCommand(_, _, false, _, _) =>
i.copy(ifExists = true)
case i @ DropTableCommand(_, false, _, _) =>
i.copy(ifExists = true)
case i @ DropDatabaseCommand(_, false, _) =>
i.copy(ifExists = true)
case i @ DropFunctionCommand(_, _, false, _) =>
i.copy(ifExists = true)
// like: org.apache.spark.sql.catalyst.analysis.ResolveCommandsWithIfExists
case DropTable(u: UnresolvedTableOrView, false, _) =>
NoopCommand("DROP TABLE", u.multipartIdentifier)
case DropView(u: UnresolvedView, false) =>
NoopCommand("DROP VIEW", u.multipartIdentifier)
case UncacheTable(u: UnresolvedRelation, false, _) =>
NoopCommand("UNCACHE TABLE", u.multipartIdentifier)
case _ => plan
}
} else {
plan
}
}
}

View File

@ -32,5 +32,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
extensions.injectPostHocResolutionRule(RebalanceBeforeWritingDatasource)
extensions.injectPostHocResolutionRule(RebalanceBeforeWritingHive)
extensions.injectPostHocResolutionRule(DropIgnoreNonexistent)
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.plans.logical.NoopCommand
import org.apache.spark.sql.execution.command._
import org.apache.kyuubi.sql.KyuubiSQLConf
class DropIgnoreNonexistentSuite extends KyuubiSparkSQLExtensionTest {
test("drop ignore nonexistent") {
withSQLConf(KyuubiSQLConf.DROP_IGNORE_NONEXISTENT.key -> "true") {
// drop nonexistent database
val df1 = sql("DROP DATABASE nonexistent_database")
assert(df1.queryExecution.analyzed.asInstanceOf[DropDatabaseCommand].ifExists == true)
// drop nonexistent table
val df2 = sql("DROP TABLE nonexistent_table")
assert(df2.queryExecution.analyzed.isInstanceOf[NoopCommand])
// drop nonexistent view
val df3 = sql("DROP VIEW nonexistent_view")
assert(df3.queryExecution.analyzed.isInstanceOf[NoopCommand])
// drop nonexistent function
val df4 = sql("DROP FUNCTION nonexistent_function")
assert(df4.queryExecution.analyzed.asInstanceOf[DropFunctionCommand].ifExists == true)
// drop nonexistent PARTITION
withTable("test") {
sql("CREATE TABLE IF NOT EXISTS test(i int) PARTITIONED BY (p int)")
val df5 = sql("ALTER TABLE test DROP PARTITION (p = 1)")
assert(df5.queryExecution.analyzed
.asInstanceOf[AlterTableDropPartitionCommand].ifExists == true)
}
}
}
}

View File

@ -114,4 +114,12 @@ object KyuubiSQLConf {
.version("1.4.0")
.intConf
.createOptional
val DROP_IGNORE_NONEXISTENT =
buildConf("spark.sql.optimizer.dropIgnoreNonExistent")
.doc("Do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies " +
"a non-existent database/table/view/function/partition")
.version("1.5.0")
.booleanConf
.createWithDefault(false)
}

View File

@ -77,3 +77,4 @@ spark.sql.analyzer.classification.enabled | false | When true, allows Kyuubi eng
spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we will follow target table properties to insert zorder or not. The key properties are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by these cols. | 1.4.0
spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder. | 1.4.0
spark.sql.watchdog.maxPartitions | none | Set the max partition number when spark scans a data source. Enable MaxPartitionStrategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined | 1.4.0
spark.sql.optimizer.dropIgnoreNonExistent | false | When true, do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a non-existent database/table/view/function/partition | 1.5.0