diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/DropIgnoreNonexistent.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/DropIgnoreNonexistent.scala new file mode 100644 index 000000000..715e788ec --- /dev/null +++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/DropIgnoreNonexistent.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.sql + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.UnresolvedTableOrView +import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopDropTable} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.{AlterTableDropPartitionCommand, DropDatabaseCommand, DropFunctionCommand, DropTableCommand} + +import org.apache.kyuubi.sql.KyuubiSQLConf._ + +case class DropIgnoreNonexistent(session: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + if (conf.getConf(DROP_IGNORE_NONEXISTENT)) { + plan match { + case i @ AlterTableDropPartitionCommand(_, _, false, _, _) => + i.copy(ifExists = true) + case i @ DropTableCommand(_, false, _, _) => + i.copy(ifExists = true) + case i @ DropDatabaseCommand(_, false, _) => + i.copy(ifExists = true) + case i @ DropFunctionCommand(_, _, false, _) => + i.copy(ifExists = true) + // like: org.apache.spark.sql.catalyst.analysis.ResolveNoopDropTable + case DropTable(u: UnresolvedTableOrView, false, _) => + NoopDropTable(u.multipartIdentifier) + case _ => plan + } + } else { + plan + } + } + +} diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala index 03e736be0..3f9193731 100644 --- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala +++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala @@ -39,6 +39,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) { extensions.injectPostHocResolutionRule(RepartitionBeforeWritingDatasource) extensions.injectPostHocResolutionRule(RepartitionBeforeWritingHive) extensions.injectPostHocResolutionRule(ForcedMaxOutputRowsRule) + extensions.injectPostHocResolutionRule(DropIgnoreNonexistent) extensions.injectPlannerStrategy(MaxPartitionStrategy) } diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/DropIgnoreNonexistentSuite.scala b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/DropIgnoreNonexistentSuite.scala new file mode 100644 index 000000000..e99afd227 --- /dev/null +++ b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/DropIgnoreNonexistentSuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.plans.logical.NoopDropTable +import org.apache.spark.sql.execution.command._ + +import org.apache.kyuubi.sql.KyuubiSQLConf + +class DropIgnoreNonexistentSuite extends KyuubiSparkSQLExtensionTest { + + test("drop ignore nonexistent") { + withSQLConf(KyuubiSQLConf.DROP_IGNORE_NONEXISTENT.key -> "true") { + // drop nonexistent database + val df1 = sql("DROP DATABASE nonexistent_database") + assert(df1.queryExecution.analyzed.asInstanceOf[DropDatabaseCommand].ifExists == true) + + // drop nonexistent table + // related org.apache.spark.sql.catalyst.analysis.ResolveNoopDropTable + val df2 = sql("DROP TABLE nonexistent_table") + assert(df2.queryExecution.analyzed.isInstanceOf[NoopDropTable]) + + // drop nonexistent view + val df3 = sql("DROP VIEW nonexistent_view") + assert(df3.queryExecution.analyzed.asInstanceOf[DropTableCommand].ifExists == true) + + // drop nonexistent function + val df4 = sql("DROP FUNCTION nonexistent_function") + assert(df4.queryExecution.analyzed.asInstanceOf[DropFunctionCommand].ifExists == true) + + // drop nonexistent PARTITION + withTable("test") { + sql("CREATE TABLE IF NOT EXISTS test(i int) PARTITIONED BY (p int)") + val df5 = sql("ALTER TABLE test DROP PARTITION (p = 1)") + assert(df5.queryExecution.analyzed + .asInstanceOf[AlterTableDropPartitionCommand].ifExists == true) + } + } + } + +} diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/DropIgnoreNonexistent.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/DropIgnoreNonexistent.scala new file mode 100644 index 000000000..6e07df09f --- /dev/null +++ b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/DropIgnoreNonexistent.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.sql + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, UnresolvedTableOrView, UnresolvedView} +import org.apache.spark.sql.catalyst.plans.logical.{DropTable, DropView, LogicalPlan, NoopCommand, UncacheTable} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.{AlterTableDropPartitionCommand, DropDatabaseCommand, DropFunctionCommand, DropTableCommand} + +import org.apache.kyuubi.sql.KyuubiSQLConf._ + +case class DropIgnoreNonexistent(session: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + if (conf.getConf(DROP_IGNORE_NONEXISTENT)) { + plan match { + case i @ AlterTableDropPartitionCommand(_, _, false, _, _) => + i.copy(ifExists = true) + case i @ DropTableCommand(_, false, _, _) => + i.copy(ifExists = true) + case i @ DropDatabaseCommand(_, false, _) => + i.copy(ifExists = true) + case i @ DropFunctionCommand(_, _, false, _) => + i.copy(ifExists = true) + // like: org.apache.spark.sql.catalyst.analysis.ResolveCommandsWithIfExists + case DropTable(u: UnresolvedTableOrView, false, _) => + NoopCommand("DROP TABLE", u.multipartIdentifier) + case DropView(u: UnresolvedView, false) => + NoopCommand("DROP VIEW", u.multipartIdentifier) + case UncacheTable(u: UnresolvedRelation, false, _) => + NoopCommand("UNCACHE TABLE", u.multipartIdentifier) + case _ => plan + } + } else { + plan + } + } + +} diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala index 2de572fcc..2b8920a25 100644 --- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala +++ b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala @@ -32,5 +32,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) { extensions.injectPostHocResolutionRule(RebalanceBeforeWritingDatasource) extensions.injectPostHocResolutionRule(RebalanceBeforeWritingHive) + extensions.injectPostHocResolutionRule(DropIgnoreNonexistent) } } diff --git a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/DropIgnoreNonexistentSuite.scala b/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/DropIgnoreNonexistentSuite.scala new file mode 100644 index 000000000..9fc4522b4 --- /dev/null +++ b/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/DropIgnoreNonexistentSuite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.plans.logical.NoopCommand +import org.apache.spark.sql.execution.command._ + +import org.apache.kyuubi.sql.KyuubiSQLConf + +class DropIgnoreNonexistentSuite extends KyuubiSparkSQLExtensionTest { + + test("drop ignore nonexistent") { + withSQLConf(KyuubiSQLConf.DROP_IGNORE_NONEXISTENT.key -> "true") { + // drop nonexistent database + val df1 = sql("DROP DATABASE nonexistent_database") + assert(df1.queryExecution.analyzed.asInstanceOf[DropDatabaseCommand].ifExists == true) + + // drop nonexistent table + val df2 = sql("DROP TABLE nonexistent_table") + assert(df2.queryExecution.analyzed.isInstanceOf[NoopCommand]) + + // drop nonexistent view + val df3 = sql("DROP VIEW nonexistent_view") + assert(df3.queryExecution.analyzed.isInstanceOf[NoopCommand]) + + // drop nonexistent function + val df4 = sql("DROP FUNCTION nonexistent_function") + assert(df4.queryExecution.analyzed.asInstanceOf[DropFunctionCommand].ifExists == true) + + // drop nonexistent PARTITION + withTable("test") { + sql("CREATE TABLE IF NOT EXISTS test(i int) PARTITIONED BY (p int)") + val df5 = sql("ALTER TABLE test DROP PARTITION (p = 1)") + assert(df5.queryExecution.analyzed + .asInstanceOf[AlterTableDropPartitionCommand].ifExists == true) + } + } + } + +} diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala index cf63a6e27..e3b6c9106 100644 --- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala +++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -114,4 +114,12 @@ object KyuubiSQLConf { .version("1.4.0") .intConf .createOptional + + val DROP_IGNORE_NONEXISTENT = + buildConf("spark.sql.optimizer.dropIgnoreNonExistent") + .doc("Do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies " + + "a non-existent database/table/view/function/partition") + .version("1.5.0") + .booleanConf + .createWithDefault(false) } diff --git a/docs/sql/rules.md b/docs/sql/rules.md index c502d848e..f79c1cc7e 100644 --- a/docs/sql/rules.md +++ b/docs/sql/rules.md @@ -77,3 +77,4 @@ spark.sql.analyzer.classification.enabled | false | When true, allows Kyuubi eng spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we will follow target table properties to insert zorder or not. The key properties are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by these cols. | 1.4.0 spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder. | 1.4.0 spark.sql.watchdog.maxPartitions | none | Set the max partition number when spark scans a data source. Enable MaxPartitionStrategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined | 1.4.0 +spark.sql.optimizer.dropIgnoreNonExistent | false | When true, do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a non-existent database/table/view/function/partition | 1.5.0