diff --git a/dev/kyuubi-extension-spark-3-1/src/main/resources/sql-classification-default.json b/dev/kyuubi-extension-spark-3-1/src/main/resources/sql-classification-default.json new file mode 100644 index 000000000..e2abc2e8d --- /dev/null +++ b/dev/kyuubi-extension-spark-3-1/src/main/resources/sql-classification-default.json @@ -0,0 +1,30 @@ +{ + "AlterDatabasePropertiesCommand": "ddl", + "AlterTableRenameCommand": "ddl", + "AlterTableRenamePartitionCommand": "ddl", + "AlterTableAddColumnsCommand": "ddl", + "AlterTableChangeColumnCommand": "ddl", + "AlterTableAddPartitionCommand": "ddl", + "AlterTableDropPartitionCommand": "ddl", + "AlterTableSetPropertiesCommand": "ddl", + "AlterTableSetLocationCommand": "ddl", + "AlterTableSerDePropertiesCommand": "ddl", + "AlterTableUnsetPropertiesCommand": "ddl", + "AlterTableRenameCommand": "ddl", + "AlterTableSetPropertiesCommand": "ddl", + "AlterTableUnsetPropertiesCommand": "ddl", + "AlterViewAsCommand": "ddl", + "CreateDatabaseCommand": "ddl", + "CreateFunctionCommand": "ddl", + "CreateDataSourceTableCommand": "ddl", + "CreateDataSourceTableAsSelectCommand": "ddl", + "CreateTableLikeCommand": "ddl", + "CreateViewCommand": "ddl", + "DropDatabaseCommand": "ddl", + "DropFunctionCommand": "ddl", + "NoopDropTable": "ddl", + "DropTableCommand": "ddl", + "TruncateTableCommand": "ddl", + "AlterTableRecoverPartitionsCommand": "ddl", + "SetCatalogAndNamespace": "ddl" +} \ No newline at end of file diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala index 0cc100ed6..dd8c8a6cd 100644 --- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala +++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -67,4 +67,13 @@ object KyuubiSQLConf { .version("1.2.0") .booleanConf .createWithDefault(false) + + val SQL_CLASSIFICATION_ENABLED = + buildConf("kyuubi.spark.sql.classification.enabled") + .doc("When true, allows Kyuubi engine to judge this SQL's classification " + + "and set it into sessionConf. " + + "Through this configuration item, Spark can optimizing configuration dynamic") + .version("1.4.0") + .booleanConf + .createWithDefault(true) } diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala index bcd2afd7b..20677a918 100644 --- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala +++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala @@ -19,6 +19,8 @@ package org.apache.kyuubi.sql import org.apache.spark.sql.SparkSessionExtensions +import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification + // scalastyle:off line.size.limit /** * Depend on Spark SQL Extension framework, we can use this extension follow steps @@ -28,6 +30,7 @@ import org.apache.spark.sql.SparkSessionExtensions // scalastyle:on line.size.limit class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) { override def apply(extensions: SparkSessionExtensions): Unit = { + extensions.injectPostHocResolutionRule(KyuubiSqlClassification) extensions.injectPostHocResolutionRule(RepartitionBeforeWrite) extensions.injectPostHocResolutionRule(RepartitionBeforeWriteHive) extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule) diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiGetSqlClassification.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiGetSqlClassification.scala new file mode 100644 index 000000000..ad3db10f4 --- /dev/null +++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiGetSqlClassification.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.sqlclassification + +import java.io.File + +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import org.apache.spark.sql.internal.SQLConf + +import org.apache.kyuubi.sql.KyuubiSQLConf._ + +/** + * This object is used for getting sql_classification by the logicalPlan's simpleName. + * When the configuration item: SQL_CLASSIFICATION_ENABLED is on, + * we will load the rule from sql-classification-default.json. + */ +object KyuubiGetSqlClassification { + + private val jsonNode: Option[JsonNode] = { + SQLConf.get.getConf(SQL_CLASSIFICATION_ENABLED) match { + case true => + try { + val defaultSqlClassificationFile = + Thread.currentThread().getContextClassLoader + .getResource("sql-classification-default.json").getPath + val objectMapper = new ObjectMapper + Some(objectMapper.readTree(new File(defaultSqlClassificationFile))) + } catch { + case e: Exception => + throw new IllegalArgumentException("sql-classification-default.json is not exist.", e) + } + case false => + None + } + } + + /** + * Notice: + * You need to make sure that the configuration item: kyuubi.spark.sql.classification.enabled + * is true + * @param simpleName: the analyzied_logical_plan's getSimpleName + * @return: This sql's classification + */ + def getSqlClassification(simpleName: String): String = { + jsonNode.map { json => + val sqlClassififation = json.get(simpleName) + if (sqlClassififation == null) { + "undefined" + } else { + sqlClassififation.asText() + } + }.getOrElse( + throw new IllegalArgumentException( + "The configuration item: kyuubi.spark.sql.classification.enabled is false") + ) + } +} diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiSqlClassification.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiSqlClassification.scala new file mode 100644 index 000000000..56c63d349 --- /dev/null +++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiSqlClassification.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.sqlclassification + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +import org.apache.kyuubi.sql.KyuubiSQLConf._ + +case class KyuubiSqlClassification(session: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + if (conf.getConf(SQL_CLASSIFICATION_ENABLED) && plan.resolved) { + val simpleName = plan.getClass.getSimpleName + val sqlClassification = KyuubiGetSqlClassification.getSqlClassification(simpleName) + session.conf.set("kyuubi.spark.sql.classification", sqlClassification) + } + plan + } +} diff --git a/dev/kyuubi-extension-spark-3-1/src/test/resources/sql-classification-default.json b/dev/kyuubi-extension-spark-3-1/src/test/resources/sql-classification-default.json new file mode 100644 index 000000000..e2abc2e8d --- /dev/null +++ b/dev/kyuubi-extension-spark-3-1/src/test/resources/sql-classification-default.json @@ -0,0 +1,30 @@ +{ + "AlterDatabasePropertiesCommand": "ddl", + "AlterTableRenameCommand": "ddl", + "AlterTableRenamePartitionCommand": "ddl", + "AlterTableAddColumnsCommand": "ddl", + "AlterTableChangeColumnCommand": "ddl", + "AlterTableAddPartitionCommand": "ddl", + "AlterTableDropPartitionCommand": "ddl", + "AlterTableSetPropertiesCommand": "ddl", + "AlterTableSetLocationCommand": "ddl", + "AlterTableSerDePropertiesCommand": "ddl", + "AlterTableUnsetPropertiesCommand": "ddl", + "AlterTableRenameCommand": "ddl", + "AlterTableSetPropertiesCommand": "ddl", + "AlterTableUnsetPropertiesCommand": "ddl", + "AlterViewAsCommand": "ddl", + "CreateDatabaseCommand": "ddl", + "CreateFunctionCommand": "ddl", + "CreateDataSourceTableCommand": "ddl", + "CreateDataSourceTableAsSelectCommand": "ddl", + "CreateTableLikeCommand": "ddl", + "CreateViewCommand": "ddl", + "DropDatabaseCommand": "ddl", + "DropFunctionCommand": "ddl", + "NoopDropTable": "ddl", + "DropTableCommand": "ddl", + "TruncateTableCommand": "ddl", + "AlterTableRecoverPartitionsCommand": "ddl", + "SetCatalogAndNamespace": "ddl" +} \ No newline at end of file diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala index 53d371f80..64529b8d9 100644 --- a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala +++ b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala @@ -418,4 +418,242 @@ class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSpar } } } + + test("get simple name for DDL") { + + import scala.collection.mutable.Set + + val ddlSimpleName: Set[String] = Set() + + // Notice: When we get Analyzed Logical Plan, the field of LogicalPlan.analyzed.analyzed is true + + // ALTER DATABASE + val sql = "CREATE DATABASE inventory;" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql) + ).getClass.getSimpleName + ) + val sql02 = "ALTER DATABASE inventory SET DBPROPERTIES " + + "('Edited-by' = 'John', 'Edit-date' = '01/01/2001');" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql02) + ).getClass.getSimpleName + ) + + // ALTER TABLE RENAME + val sql03 = "CREATE TABLE student (name VARCHAR(64), rollno INT, age INT) " + + "USING PARQUET PARTITIONED BY (age);" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql03) + ).getClass.getSimpleName + ) + val sql04 = "INSERT INTO student VALUES " + + "('zhang', 1, 10),('yu', 2, 11),('xiang', 3, 12),('zhangyuxiang', 4, 17);" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql04) + ).getClass.getSimpleName + ) + val sql05 = "ALTER TABLE Student RENAME TO StudentInfo;" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql05) + ).getClass.getSimpleName + ) + + // ALTER TABLE RENAME PARTITION + val sql06 = "ALTER TABLE default.StudentInfo PARTITION (age='10') " + + "RENAME TO PARTITION (age='15');" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql06) + ).getClass.getSimpleName + ) + + var pre_sql = "CREATE TABLE IF NOT EXISTS StudentInfo " + + "(name VARCHAR(64), rollno INT, age INT) USING PARQUET PARTITIONED BY (age);" + spark.sql(pre_sql) + // ALTER TABLE ADD COLUMNS + val sql07 = "ALTER TABLE StudentInfo ADD columns (LastName string, DOB timestamp);" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql07) + ).getClass.getSimpleName + ) + + // ALTER TABLE ALTER COLUMN + val sql08 = "ALTER TABLE StudentInfo ALTER COLUMN age COMMENT \"new comment\";" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql08) + ).getClass.getSimpleName + ) + + // ALTER TABLE CHANGE COLUMN + val sql09 = "ALTER TABLE StudentInfo CHANGE COLUMN age COMMENT \"new comment123\";" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql09) + ).getClass.getSimpleName + ) + + // ALTER TABLE ADD PARTITION + val sql10 = "ALTER TABLE StudentInfo ADD IF NOT EXISTS PARTITION (age=18);" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql10) + ).getClass.getSimpleName + ) + + // ALTER TABLE DROP PARTITION + val sql11 = "ALTER TABLE StudentInfo DROP IF EXISTS PARTITION (age=18);" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql11) + ).getClass.getSimpleName + ) + + // CREAT VIEW + val sql12 = "CREATE OR REPLACE VIEW studentinfo_view " + + "AS SELECT name, rollno FROM studentinfo;" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql12) + ).getClass.getSimpleName + ) + + // ALTER VIEW RENAME TO + val sql13 = "ALTER VIEW studentinfo_view RENAME TO studentinfo_view2;" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql13) + ).getClass.getSimpleName + ) + + // ALTER VIEW SET TBLPROPERTIES + val sql14 = "ALTER VIEW studentinfo_view2 SET TBLPROPERTIES " + + "('created.by.user' = \"zhangyuxiang\", 'created.date' = '08-20-2021' );" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql14) + ).getClass.getSimpleName + ) + + // ALTER VIEW UNSET TBLPROPERTIES + val sql15 = "ALTER VIEW studentinfo_view2 UNSET TBLPROPERTIES " + + "('created.by.user', 'created.date');" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql15) + ).getClass.getSimpleName + ) + + // ALTER VIEW AS SELECT + val sql16 = "ALTER VIEW studentinfo_view2 AS SELECT * FROM studentinfo;" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql16) + ).getClass.getSimpleName + ) + + // CREATE DATASOURCE TABLE AS SELECT + val sql17 = "CREATE TABLE student_copy USING CSV AS SELECT * FROM studentinfo;" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql17) + ).getClass.getSimpleName + ) + + // CREATE DATASOURCE TABLE AS LIKE + val sql18 = "CREATE TABLE Student_Dupli like studentinfo;" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql18) + ).getClass.getSimpleName + ) + + // USE DATABASE + val sql26 = "USE default;" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql26) + ).getClass.getSimpleName + ) + + // DROP DATABASE + val sql19 = "DROP DATABASE IF EXISTS inventory_db CASCADE;" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql19) + ).getClass.getSimpleName + ) + + // CREATE FUNCTION + val sql20 = "CREATE FUNCTION test_avg AS " + + "'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage';" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql20) + ).getClass.getSimpleName + ) + + // DROP FUNCTION + val sql21 = "DROP FUNCTION test_avg;" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql21) + ).getClass.getSimpleName + ) + + spark.sql("CREATE TABLE IF NOT EXISTS studentabc (name VARCHAR(64), rollno INT, age INT) " + + "USING PARQUET PARTITIONED BY (age);") + // DROP TABLE + val sql22 = "DROP TABLE IF EXISTS studentabc;" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql22) + ).getClass.getSimpleName + ) + + // DROP VIEW + val sql23 = "DROP VIEW studentinfo_view2;" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql23) + ).getClass.getSimpleName + ) + + // TRUNCATE TABLE + val sql24 = "TRUNCATE TABLE StudentInfo;" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql24) + ).getClass.getSimpleName + ) + + // REPAIR TABLE + val sql25 = "MSCK REPAIR TABLE StudentInfo;" + ddlSimpleName.add( + spark.sessionState.analyzer.execute( + spark.sessionState.sqlParser.parsePlan(sql25) + ).getClass.getSimpleName + ) + // scalastyle:off println + println("ddl simple name is :" + ddlSimpleName) + // scalastyle:on println + } + + test("Sql classification for ddl") { + withSQLConf(KyuubiSQLConf.SQL_CLASSIFICATION_ENABLED.key -> "true") { + withDatabase("inventory") { + val df = sql("CREATE DATABASE inventory;") + assert(df.sparkSession.conf.get("kyuubi.spark.sql.classification") === "ddl") + } + val df = sql("select timestamp'2021-06-01'") + assert(df.sparkSession.conf.get("kyuubi.spark.sql.classification") !== "ddl") + } + } } diff --git a/pom.xml b/pom.xml index 5f8a57796..e7fc0a8bc 100644 --- a/pom.xml +++ b/pom.xml @@ -1501,6 +1501,7 @@ **/.*/** + **/*.json **/*.prefs **/*.log **/*.md