From f3f643a309a04fd4be848228c9227bc27ffa4b8d Mon Sep 17 00:00:00 2001 From: "kandy01.wang" Date: Thu, 7 Dec 2023 11:10:57 +0800 Subject: [PATCH] [KYUUBI #5756] Introduce specified initialized SQL to every engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— This pull request fixes #5756 ## Describe Your Solution ๐Ÿ”ง Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklists ## ๐Ÿ“ Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [x] I have commented my code, particularly in hard-to-understand areas - [x] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [x] New and existing unit tests pass locally with my changes - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## ๐Ÿ“ Committer Pre-Merge Checklist - [x] Pull request title is okay. - [x] No license issues. - [x] Milestone correctly set? - [x] Test coverage is ok - [x] Assignees are selected. - [x] Minimum number of approvals - [x] No changes are requested **Be nice. Be informative.** Closes #5821 from hadoopkandy/KYUUBI-5756. Closes #5756 046fe2a58 [kandy01.wang] [KYUUBI #5756] Introduce specified initialized SQL to every engine Authored-by: kandy01.wang Signed-off-by: Cheng Pan --- docs/configuration/settings.md | 4 +++ .../kyuubi/engine/flink/FlinkSQLEngine.scala | 4 +-- .../flink/session/FlinkSessionImpl.scala | 4 +-- .../FlinkEngineInitializeSuite.scala | 4 +-- .../kyuubi/engine/spark/SparkSQLEngine.scala | 3 ++- .../session/SparkSQLSessionManager.scala | 4 ++- .../engine/spark/IndividualSparkSuite.scala | 2 +- .../spark/session/SingleSessionSuite.scala | 2 +- .../org/apache/kyuubi/config/KyuubiConf.scala | 26 +++++++++++++++++++ .../engine/spark/InitializeSQLSuite.scala | 6 ++--- 10 files changed, 46 insertions(+), 13 deletions(-) diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index 580ad1700..651829b4f 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -140,6 +140,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.engine.event.loggers | SPARK | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
  • SPARK: the events will be written to the Spark listener bus.
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: User-defined event handlers.
Note that: Kyuubi supports custom event handlers with the Java SPI. To register a custom event handler, the user needs to implement a subclass of `org.apache.kyuubi.events.handler.CustomEventHandlerProvider` which has a zero-arg constructor. | seq | 1.3.0 | | kyuubi.engine.flink.application.jars | <undefined> | A comma-separated list of the local jars to be shipped with the job to the cluster. For example, SQL UDF jars. Only effective in yarn application mode. | string | 1.8.0 | | kyuubi.engine.flink.extra.classpath | <undefined> | The extra classpath for the Flink SQL engine, for configuring the location of hadoop client jars, etc. Only effective in yarn session mode. | string | 1.6.0 | +| kyuubi.engine.flink.initialize.sql | SHOW DATABASES | The initialize sql for Flink engine. It fallback to `kyuubi.engine.initialize.sql`. | seq | 1.8.1 | | kyuubi.engine.flink.java.options | <undefined> | The extra Java options for the Flink SQL engine. Only effective in yarn session mode. | string | 1.6.0 | | kyuubi.engine.flink.memory | 1g | The heap memory for the Flink SQL engine. Only effective in yarn session mode. | string | 1.6.0 | | kyuubi.engine.hive.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
| seq | 1.7.0 | @@ -174,6 +175,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.engine.share.level.subdomain | <undefined> | Allow end-users to create a subdomain for the share level of an engine. A subdomain is a case-insensitive string values that must be a valid zookeeper subpath. For example, for the `USER` share level, an end-user can share a certain engine within a subdomain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level. When disable engine pool, use 'default' if absent. | string | 1.4.0 | | kyuubi.engine.single.spark.session | false | When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database. | boolean | 1.3.0 | | kyuubi.engine.spark.event.loggers | SPARK | A comma-separated list of engine loggers, where engine/session/operation etc events go.
  • SPARK: the events will be written to the Spark listener bus.
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
| seq | 1.7.0 | +| kyuubi.engine.spark.initialize.sql | SHOW DATABASES | The initialize sql for Spark engine. It fallback to `kyuubi.engine.initialize.sql`. | seq | 1.8.1 | | kyuubi.engine.spark.python.env.archive | <undefined> | Portable Python env archive used for Spark engine Python language mode. | string | 1.7.0 | | kyuubi.engine.spark.python.env.archive.exec.path | bin/python | The Python exec path under the Python env archive. | string | 1.7.0 | | kyuubi.engine.spark.python.home.archive | <undefined> | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode. | string | 1.7.0 | @@ -427,6 +429,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.session.engine.alive.timeout | PT2M | The timeout for engine alive. If there is no alive probe success in the last timeout window, the engine will be marked as no-alive. | duration | 1.6.0 | | kyuubi.session.engine.check.interval | PT1M | The check interval for engine timeout | duration | 1.0.0 | | kyuubi.session.engine.flink.fetch.timeout | <undefined> | Result fetch timeout for Flink engine. If the timeout is reached, the result fetch would be stopped and the current fetched would be returned. If no data are fetched, a TimeoutException would be thrown. | duration | 1.8.0 | +| kyuubi.session.engine.flink.initialize.sql || The initialize sql for Flink session. It fallback to `kyuubi.engine.session.initialize.sql` | seq | 1.8.1 | | kyuubi.session.engine.flink.main.resource | <undefined> | The package used to create Flink SQL engine remote job. If it is undefined, Kyuubi will use the default | string | 1.4.0 | | kyuubi.session.engine.flink.max.rows | 1000000 | Max rows of Flink query results. For batch queries, rows exceeding the limit would be ignored. For streaming queries, the query would be canceled if the limit is reached. | int | 1.5.0 | | kyuubi.session.engine.hive.main.resource | <undefined> | The package used to create Hive engine remote job. If it is undefined, Kyuubi will use the default | string | 1.6.0 | @@ -438,6 +441,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.session.engine.open.max.attempts | 9 | The number of times an open engine will retry when encountering a special error. | int | 1.7.0 | | kyuubi.session.engine.open.retry.wait | PT10S | How long to wait before retrying to open the engine after failure. | duration | 1.7.0 | | kyuubi.session.engine.share.level | USER | (deprecated) - Using kyuubi.engine.share.level instead | string | 1.0.0 | +| kyuubi.session.engine.spark.initialize.sql || The initialize sql for Spark session. It fallback to `kyuubi.engine.session.initialize.sql` | seq | 1.8.1 | | kyuubi.session.engine.spark.main.resource | <undefined> | The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default | string | 1.0.0 | | kyuubi.session.engine.spark.max.initial.wait | PT1M | Max wait time for the initial connection to Spark engine. The engine will self-terminate no new incoming connection is established within this time. This setting only applies at the CONNECTION share level. 0 or negative means not to self-terminate. | duration | 1.8.0 | | kyuubi.session.engine.spark.max.lifetime | PT0S | Max lifetime for Spark engine, the engine will self-terminate when it reaches the end of life. 0 or negative means not to self-terminate. | duration | 1.6.0 | diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala index db50c1c33..dff9aa602 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala @@ -32,7 +32,7 @@ import org.apache.flink.table.gateway.service.context.DefaultContext import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY} import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.config.KyuubiConf.ENGINE_INITIALIZE_SQL +import org.apache.kyuubi.config.KyuubiConf.ENGINE_FLINK_INITIALIZE_SQL import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY} import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine} import org.apache.kyuubi.service.Serverable @@ -139,7 +139,7 @@ object FlinkSQLEngine extends Logging { tableEnv.executeSql("select 'kyuubi'").await() } - kyuubiConf.get(ENGINE_INITIALIZE_SQL).foreach { stmt => + kyuubiConf.get(ENGINE_FLINK_INITIALIZE_SQL).foreach { stmt => tableEnv.executeSql(stmt).await() } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala index 5bfacc694..2dfb57848 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala @@ -65,12 +65,12 @@ class FlinkSessionImpl( override def open(): Unit = { val executor = fSession.createExecutor(Configuration.fromMap(fSession.getSessionConfig)) - sessionManager.getConf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sql => + sessionManager.getConf.get(ENGINE_SESSION_FLINK_INITIALIZE_SQL).foreach { sql => try { executor.executeStatement(OperationHandle.create, sql) } catch { case NonFatal(e) => - throw KyuubiSQLException(s"execute ${ENGINE_SESSION_INITIALIZE_SQL.key} $sql ", e) + throw KyuubiSQLException(s"execute ${ENGINE_SESSION_FLINK_INITIALIZE_SQL.key} $sql ", e) } } diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala index db174e501..c98d07cc4 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkEngineInitializeSuite.scala @@ -53,8 +53,8 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper ENGINE_TYPE.key -> "FLINK_SQL", ENGINE_SHARE_LEVEL.key -> shareLevel, OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name, - ENGINE_INITIALIZE_SQL.key -> ENGINE_INITIALIZE_SQL_VALUE, - ENGINE_SESSION_INITIALIZE_SQL.key -> ENGINE_SESSION_INITIALIZE_SQL_VALUE, + ENGINE_FLINK_INITIALIZE_SQL.key -> ENGINE_INITIALIZE_SQL_VALUE, + ENGINE_SESSION_FLINK_INITIALIZE_SQL.key -> ENGINE_SESSION_INITIALIZE_SQL_VALUE, KYUUBI_SESSION_USER_KEY -> "kandy") } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index dbf5075a1..bf7be14b8 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -290,7 +290,8 @@ object SparkSQLEngine extends Logging { KyuubiSparkUtil.initializeSparkSession( session, - kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++ kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL)) + kyuubiConf.get(ENGINE_SPARK_INITIALIZE_SQL) ++ kyuubiConf.get( + ENGINE_SESSION_SPARK_INITIALIZE_SQL)) session.sparkContext.setLocalProperty(KYUUBI_ENGINE_URL, KyuubiSparkUtil.engineUrl) session } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala index 7ebcc8d37..a66376f2c 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala @@ -130,7 +130,9 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) private def newSparkSession(rootSparkSession: SparkSession): SparkSession = { val newSparkSession = rootSparkSession.newSession() - KyuubiSparkUtil.initializeSparkSession(newSparkSession, conf.get(ENGINE_SESSION_INITIALIZE_SQL)) + KyuubiSparkUtil.initializeSparkSession( + newSparkSession, + conf.get(ENGINE_SESSION_SPARK_INITIALIZE_SQL)) newSparkSession } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala index 8376705ef..e924aa3de 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala @@ -104,7 +104,7 @@ class SparkEngineSuites extends KyuubiFunSuite { withSystemProperty(Map( s"spark.$KYUUBI_ENGINE_SUBMIT_TIME_KEY" -> String.valueOf(submitTime), s"spark.${ENGINE_INIT_TIMEOUT.key}" -> String.valueOf(timeout), - s"spark.${ENGINE_INITIALIZE_SQL.key}" -> + s"spark.${ENGINE_SPARK_INITIALIZE_SQL.key}" -> "select 1 where java_method('java.lang.Thread', 'sleep', 60000L) is null")) { SparkSQLEngine.setupConf() SparkSQLEngine.currentEngine = None diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SingleSessionSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SingleSessionSuite.scala index 0f0e07411..82a85bfcf 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SingleSessionSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SingleSessionSuite.scala @@ -28,7 +28,7 @@ class SingleSessionSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { ENGINE_SHARE_LEVEL.key -> "SERVER", ENGINE_SINGLE_SPARK_SESSION.key -> "true", ( - ENGINE_SESSION_INITIALIZE_SQL.key, + ENGINE_SESSION_SPARK_INITIALIZE_SQL.key, "CREATE DATABASE IF NOT EXISTS INIT_DB_SOLO;" + "CREATE TABLE IF NOT EXISTS INIT_DB_SOLO.test(a int) USING CSV;" + "INSERT INTO INIT_DB_SOLO.test VALUES (2);")) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 323fd222c..dcd84e7be 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -2140,6 +2140,13 @@ object KyuubiConf { .toSequence(";") .createWithDefault(Nil) + val ENGINE_SESSION_FLINK_INITIALIZE_SQL: ConfigEntry[Seq[String]] = + buildConf("kyuubi.session.engine.flink.initialize.sql") + .doc("The initialize sql for Flink session. " + + "It fallback to `kyuubi.engine.session.initialize.sql`") + .version("1.8.1") + .fallbackConf(ENGINE_SESSION_INITIALIZE_SQL) + val ENGINE_DEREGISTER_EXCEPTION_CLASSES: ConfigEntry[Set[String]] = buildConf("kyuubi.engine.deregister.exception.classes") .doc("A comma-separated list of exception classes. If there is any exception thrown," + @@ -2583,6 +2590,13 @@ object KyuubiConf { .stringConf .createWithDefault("yyyy-MM-dd HH:mm:ss.SSS") + val ENGINE_SESSION_SPARK_INITIALIZE_SQL: ConfigEntry[Seq[String]] = + buildConf("kyuubi.session.engine.spark.initialize.sql") + .doc("The initialize sql for Spark session. " + + "It fallback to `kyuubi.engine.session.initialize.sql`") + .version("1.8.1") + .fallbackConf(ENGINE_SESSION_INITIALIZE_SQL) + val ENGINE_TRINO_MEMORY: ConfigEntry[String] = buildConf("kyuubi.engine.trino.memory") .doc("The heap memory for the Trino query engine") @@ -2657,6 +2671,12 @@ object KyuubiConf { .stringConf .createOptional + val ENGINE_FLINK_INITIALIZE_SQL: ConfigEntry[Seq[String]] = + buildConf("kyuubi.engine.flink.initialize.sql") + .doc("The initialize sql for Flink engine. It fallback to `kyuubi.engine.initialize.sql`.") + .version("1.8.1") + .fallbackConf(ENGINE_INITIALIZE_SQL) + val SERVER_LIMIT_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] = buildConf("kyuubi.server.limit.connections.per.user") .doc("Maximum kyuubi server connections per user." + @@ -3154,6 +3174,12 @@ object KyuubiConf { .toSequence() .createWithDefault(Seq("spark.driver.memory", "spark.executor.memory")) + val ENGINE_SPARK_INITIALIZE_SQL: ConfigEntry[Seq[String]] = + buildConf("kyuubi.engine.spark.initialize.sql") + .doc("The initialize sql for Spark engine. It fallback to `kyuubi.engine.initialize.sql`.") + .version("1.8.1") + .fallbackConf(ENGINE_INITIALIZE_SQL) + val ENGINE_HIVE_EVENT_LOGGERS: ConfigEntry[Seq[String]] = buildConf("kyuubi.engine.hive.event.loggers") .doc("A comma-separated list of engine history loggers, where engine/session/operation etc" + diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala index 10d662467..e119d9802 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala @@ -19,19 +19,19 @@ package org.apache.kyuubi.engine.spark import org.apache.kyuubi.WithKyuubiServer import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INITIALIZE_SQL, ENGINE_SESSION_INITIALIZE_SQL} +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SESSION_SPARK_INITIALIZE_SQL, ENGINE_SPARK_INITIALIZE_SQL} import org.apache.kyuubi.operation.HiveJDBCTestHelper class InitializeSQLSuite extends WithKyuubiServer with HiveJDBCTestHelper { override protected val conf: KyuubiConf = { KyuubiConf() .set( - ENGINE_INITIALIZE_SQL.key, + ENGINE_SPARK_INITIALIZE_SQL.key, "CREATE DATABASE IF NOT EXISTS INIT_DB;" + "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" + "INSERT OVERWRITE TABLE INIT_DB.test VALUES (1);") .set( - ENGINE_SESSION_INITIALIZE_SQL.key, + ENGINE_SESSION_SPARK_INITIALIZE_SQL.key, "CREATE DATABASE IF NOT EXISTS INIT_DB;" + "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" + "INSERT INTO INIT_DB.test VALUES (2);")