From acb4f299069a093dcc111b80c9022e2b3e897cfe Mon Sep 17 00:00:00 2001 From: timothy65535 Date: Fri, 27 Aug 2021 11:18:00 +0800 Subject: [PATCH] [KYUUBI #962] Support engine pool feature ### _Why are the changes needed?_ For detail, please go to _Originally posted by yaooqinn in https://github.com/apache/incubator-kyuubi/issues/962#issuecomment-905116261_ ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request ``` ./bin/beeline -u 'jdbc:hive2://localhost:10009/;?kyuubi.engine.pool.enabled=true;kyuubi.engine.pool.size=3;' ``` Closes #986 from timothy65535/ky-962-v2. Closes #962 b79f8778 [timothy65535] fix test name 9a435607 [timothy65535] fix style abc746b8 [timothy65535] update tests 71b6933d [timothy65535] revert changes 018dee8b [timothy65535] revert changes e3492934 [timothy65535] revert changes 67fc8fd9 [timothy65535] improve threshold fa52acb2 [timothy65535] update pool threshold d8b5977d [timothy65535] remove dup get 7f39b8d7 [timothy65535] fix test error 4fdf8d76 [timothy65535] update thread pool size to 1 -> threshold 2477348a [timothy65535] rich engine pool tests 8eee427f [timothy65535] fix update error msg 104be5ea [timothy65535] fix error tests 8c2d1441 [timothy65535] fix doc ddd0eda9 [timothy65535] improve doc & tests 83d7c5cf [timothy65535] update doc 4b6c7ee3 [timothy65535] improve subdomain 169d9d06 [timothy65535] update todo 1424e565 [timothy65535] increase pool size range 3523cd6c [timothy65535] update todo 2cbeafb2 [timothy65535] remove enable option 424b5364 [timothy65535] revert ServiceDiscovery 6fc1c510 [timothy65535] remove random policy 9feef8f3 [timothy65535] [KYUUBI #962] Support engine pool feature Authored-by: timothy65535 Signed-off-by: Kent Yao --- docs/deployment/settings.md | 4 +- .../org/apache/kyuubi/config/KyuubiConf.scala | 23 +++++- .../org/apache/kyuubi/engine/EngineRef.scala | 31 +++++++- .../kyuubi/session/KyuubiSessionImpl.scala | 2 + .../apache/kyuubi/engine/EngineRefSuite.scala | 29 +++++++ .../engine/spark/SparkSqlEngineSuite.scala | 2 +- .../KyuubiOperationEnginePoolSuite.scala | 76 +++++++++++++++++++ .../KyuubiOperationPerUserSuite.scala | 33 ++++++++ 8 files changed, 191 insertions(+), 9 deletions(-) create mode 100644 kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationEnginePoolSuite.scala diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index c87796fc2..35b8a2860 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -168,9 +168,11 @@ kyuubi\.engine
\.deregister\.job\.max
\.failures|
/tmp/kyuubi/events
|
The location of all the engine events go for the builtin JSON logger.
|
string
|
1.3.0
kyuubi\.engine\.event
\.loggers|
|
A comma separated list of engine history loggers, where engine/session/operation etc events go.
  • SPARK: the events will be written to the spark history events
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
|
seq
|
1.3.0
kyuubi\.engine
\.initialize\.sql|
SHOW DATABASES
|
SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.
|
seq
|
1.2.0
+kyuubi\.engine\.pool
\.size|
-1
|
The size of engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold).
|
int
|
1.4.0
+kyuubi\.engine\.pool
\.size\.threshold|
9
|
This parameter is introduced as a server-side parameter, and controls the upper limit of the engine pool.
|
int
|
1.4.0
kyuubi\.engine\.session
\.initialize\.sql|
SHOW DATABASES
|
SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.
|
seq
|
1.3.0
kyuubi\.engine\.share
\.level|
USER
|
Engines will be shared in different levels, available configs are:
  • CONNECTION: engine will not be shared but only used by the current client connection
  • USER: engine will be shared by all sessions created by a unique username, see also kyuubi.engine.share.level.sub.domain
  • SERVER: the App will be shared by Kyuubi servers
|
string
|
1.2.0
-kyuubi\.engine\.share
\.level\.sub\.domain|
<undefined>
|
Allow end-users to create a sub-domain for the share level of an engine. A sub-domain is a case-insensitive string values in `^[a-zA-Z_]{1,10}$` form. For example, for `USER` share level, an end-user can share a certain engine within a sub-domain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level
|
string
|
1.2.0
+kyuubi\.engine\.share
\.level\.sub\.domain|
<undefined>
|
Allow end-users to create a sub-domain for the share level of an engine. A sub-domain is a case-insensitive string values in `^[a-zA-Z_-]{1,14}$` form. For example, for `USER` share level, an end-user can share a certain engine within a sub-domain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level
|
string
|
1.2.0
kyuubi\.engine\.single
\.spark\.session|
false
|
When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.
|
boolean
|
1.3.0
kyuubi\.engine\.ui\.stop
\.enabled|
true
|
When true, allows Kyuubi engine to be killed from the Spark Web UI.
|
boolean
|
1.3.0
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 6073f5fb1..7627ef25d 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -572,12 +572,12 @@ object KyuubiConf { .checkValues(ShareLevel.values.map(_.toString)) .createWithDefault(ShareLevel.USER.toString) - private val validEngineSubDomain: Pattern = "^[a-zA-Z_]{1,10}$".r.pattern + private val validEngineSubDomain: Pattern = "^[a-zA-Z_-]{1,14}$".r.pattern val ENGINE_SHARE_LEVEL_SUB_DOMAIN: OptionalConfigEntry[String] = buildConf("engine.share.level.sub.domain") .doc("Allow end-users to create a sub-domain for the share level of an engine. A" + - " sub-domain is a case-insensitive string values in `^[a-zA-Z_]{1,10}$` form." + + " sub-domain is a case-insensitive string values in `^[a-zA-Z_-]{1,14}$` form." + " For example, for `USER` share level, an end-user can share a certain engine within" + " a sub-domain, not for all of its clients. End-users are free to create multiple" + " engines in the `USER` share level") @@ -585,7 +585,7 @@ object KyuubiConf { .stringConf .transform(_.toLowerCase(Locale.ROOT)) .checkValue(validEngineSubDomain.matcher(_).matches(), - "must be [1, 10] length alphabet string, e.g. 'abc', 'apache'") + "must be [1, 14] length alphabet string, e.g. 'abc', 'apache'") .createOptional val ENGINE_CONNECTION_URL_USE_HOSTNAME: ConfigEntry[Boolean] = @@ -606,6 +606,23 @@ object KyuubiConf { .version("1.2.0") .fallbackConf(LEGACY_ENGINE_SHARE_LEVEL) + val ENGINE_POOL_SIZE_THRESHOLD: ConfigEntry[Int] = buildConf("engine.pool.size.threshold") + .doc("This parameter is introduced as a server-side parameter, " + + "and controls the upper limit of the engine pool.") + .version("1.4.0") + .intConf + .checkValue(s => s > 0 && s < 33, "Invalid engine pool threshold, it should be in [1, 32]") + .createWithDefault(9) + + val ENGINE_POOL_SIZE: ConfigEntry[Int] = buildConf("engine.pool.size") + .doc("The size of engine pool. Note that, " + + "if the size is less than 1, the engine pool will not be enabled; " + + "otherwise, the size of the engine pool will be " + + s"min(this, ${ENGINE_POOL_SIZE_THRESHOLD.key}).") + .version("1.4.0") + .intConf + .createWithDefault(-1) + val ENGINE_INITIALIZE_SQL: ConfigEntry[Seq[String]] = buildConf("engine.initialize.sql") .doc("SemiColon-separated list of SQL statements to be initialized in the newly created " + diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index ee34b2611..0eecccb5e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -19,6 +19,8 @@ package org.apache.kyuubi.engine import java.util.concurrent.TimeUnit +import scala.util.Random + import com.codahale.metrics.MetricRegistry import com.google.common.annotations.VisibleForTesting import org.apache.curator.framework.CuratorFramework @@ -27,7 +29,7 @@ import org.apache.curator.utils.ZKPaths import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUB_DOMAIN} +import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, SERVER, ShareLevel} import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_SESSION_ID @@ -56,7 +58,29 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI // Share level of the engine private val shareLevel: ShareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL)) - private val subDomain: Option[String] = conf.get(ENGINE_SHARE_LEVEL_SUB_DOMAIN) + // Server-side engine pool size threshold + private val poolThreshold: Int = conf.get(ENGINE_POOL_SIZE_THRESHOLD) + + @VisibleForTesting + private[kyuubi] val subDomain: Option[String] = conf.get(ENGINE_SHARE_LEVEL_SUB_DOMAIN).orElse { + val clientPoolSize: Int = conf.get(ENGINE_POOL_SIZE) + + if (clientPoolSize > 0) { + val poolSize = if (clientPoolSize <= poolThreshold) { + clientPoolSize + } else { + warn(s"Request engine pool size($clientPoolSize) exceeds, fallback to system threshold " + + s"$poolThreshold") + poolThreshold + } + + // TODO: Currently, we use random policy, and later we can add a sequential policy, + // such as AtomicInteger % poolSize. + Some("engine-pool-" + Random.nextInt(poolSize)) + } else { + None + } + } // Launcher of the engine private val appUser: String = shareLevel match { @@ -126,9 +150,8 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI } private def create(zkClient: CuratorFramework): (String, Int) = tryWithLock(zkClient) { - // TODO: improve this after support engine pool. (KYUUBI #918) - var engineRef = getServerHost(zkClient, engineSpace) // Get the engine address ahead if another process has succeeded + var engineRef = getServerHost(zkClient, engineSpace) if (engineRef.nonEmpty) return engineRef.get conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 536464acd..a3a0c5920 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -43,8 +43,10 @@ class KyuubiSessionImpl( sessionConf: KyuubiConf) extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) { + // TODO: needs improve the hardcode normalizedConf.foreach { case ("use:database", _) => + case ("kyuubi.engine.pool.size.threshold", _) => case (key, value) => sessionConf.set(key, value) } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala index 53a46d514..78d0127e8 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala @@ -23,6 +23,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.kyuubi.{KyuubiFunSuite, Utils} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.ha.HighAvailabilityConf import org.apache.kyuubi.ha.client.ZooKeeperClientProvider import org.apache.kyuubi.session.SessionHandle @@ -93,6 +94,34 @@ class EngineRefSuite extends KyuubiFunSuite { assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${user}_abc_${id.identifier}") } + test("check the engine space of engine pool") { + val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + + // test subdomain + conf.set(ENGINE_SHARE_LEVEL_SUB_DOMAIN, "abc") + val engine1 = EngineRef(conf, user, id) + assert(engine1.subDomain === Some("abc")) + + // unset domain + conf.unset(ENGINE_SHARE_LEVEL_SUB_DOMAIN) + val engine2 = EngineRef(conf, user, id) + assert(engine2.subDomain === None) + + // 1 <= engine pool size < threshold + conf.unset(ENGINE_SHARE_LEVEL_SUB_DOMAIN) + conf.set(ENGINE_POOL_SIZE, 3) + val engine3 = EngineRef(conf, user, id) + assert(engine3.subDomain.get.startsWith("engine-pool-")) + + // engine pool size > threshold + conf.unset(ENGINE_SHARE_LEVEL_SUB_DOMAIN) + conf.set(ENGINE_POOL_SIZE, 100) + val engine4 = EngineRef(conf, user, id) + val engineNumber = Integer.parseInt(engine4.subDomain.get.substring(12)) + val threshold = ENGINE_POOL_SIZE_THRESHOLD.defaultVal.get + assert(engineNumber <= threshold) + } + test("start and get engine address with lock") { val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala index a761add9d..57a45541b 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala @@ -72,7 +72,7 @@ class SparkSqlEngineSuite extends WithKyuubiServer with JDBCTestUtils { test("Fail connections on invalid sub domains") { - Seq("1", ",", "", "a" * 11, "abc.xyz").foreach { invalid => + Seq("1", ",", "", "a" * 15, "abc.xyz").foreach { invalid => val sparkHiveConfigs = Map( ENGINE_SHARE_LEVEL.key -> "USER", ENGINE_SHARE_LEVEL_SUB_DOMAIN.key -> invalid) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationEnginePoolSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationEnginePoolSuite.scala new file mode 100644 index 000000000..b121dfd15 --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationEnginePoolSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import org.scalatest.time.SpanSugar._ + +import org.apache.kyuubi.WithKyuubiServer +import org.apache.kyuubi.config.KyuubiConf + +class KyuubiOperationEnginePoolSuite extends WithKyuubiServer with JDBCTestUtils { + + override protected def jdbcUrl: String = getJdbcUrl + + override protected val conf: KyuubiConf = { + KyuubiConf() + } + + test("ensure app name contains engine-pool when engine pool is enabled.") { + withSessionConf()( + Map( + KyuubiConf.ENGINE_SHARE_LEVEL.key -> "user", + KyuubiConf.ENGINE_POOL_SIZE.key -> "2" + ))(Map.empty) { + + var r1: String = null + withJdbcStatement() { statement => + val res = statement.executeQuery("set spark.app.name") + assert(res.next()) + r1 = res.getString("value") + } + + eventually(timeout(120.seconds), interval(100.milliseconds)) { + assert(r1 != null) + } + + assert(r1.contains("engine-pool-")) + } + } + + test("ensure the sub-domain doesn't work with the CONNECTION share level.") { + withSessionConf()( + Map( + KyuubiConf.ENGINE_SHARE_LEVEL.key -> "connection", + KyuubiConf.ENGINE_POOL_SIZE.key -> "2" + ))(Map.empty) { + + var r1: String = null + withJdbcStatement() { statement => + val res = statement.executeQuery("set spark.app.name") + assert(res.next()) + r1 = res.getString("value") + } + + eventually(timeout(120.seconds), interval(100.milliseconds)) { + assert(r1 != null) + } + + assert(r1.contains("engine-pool-") === false) + } + } +} diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala index 32aa966be..4e6de55a0 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala @@ -55,4 +55,37 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with JDBCTests { assert(r1 === r2) } + + test("ensure two connections share the same engine when specifying subDomain.") { + withSessionConf()( + Map( + KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN.key -> "abc" + ))(Map.empty) { + + var r1: String = null + var r2: String = null + new Thread { + override def run(): Unit = withJdbcStatement() { statement => + val res = statement.executeQuery("set spark.app.name") + assert(res.next()) + r1 = res.getString("value") + } + }.start() + + new Thread { + override def run(): Unit = withJdbcStatement() { statement => + val res = statement.executeQuery("set spark.app.name") + assert(res.next()) + r2 = res.getString("value") + } + }.start() + + eventually(timeout(120.seconds), interval(100.milliseconds)) { + assert(r1 != null && r2 != null) + } + + assert(r1 === r2) + } + } + }