diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index c87796fc2..35b8a2860 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -168,9 +168,11 @@ kyuubi\.engine
\.deregister\.job\.max
\.failures|
/tmp/kyuubi/events
|
The location of all the engine events go for the builtin JSON logger.
|
string
|
1.3.0
kyuubi\.engine\.event
\.loggers|
|
A comma separated list of engine history loggers, where engine/session/operation etc events go.
|
seq
|
1.3.0
kyuubi\.engine
\.initialize\.sql|
SHOW DATABASES
|
SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.
|
seq
|
1.2.0
+kyuubi\.engine\.pool
\.size|
-1
|
The size of engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold).
|
int
|
1.4.0
+kyuubi\.engine\.pool
\.size\.threshold|
9
|
This parameter is introduced as a server-side parameter, and controls the upper limit of the engine pool.
|
int
|
1.4.0
kyuubi\.engine\.session
\.initialize\.sql|
SHOW DATABASES
|
SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.
|
seq
|
1.3.0
kyuubi\.engine\.share
\.level|
USER
|
Engines will be shared in different levels, available configs are:
|
string
|
1.2.0
-kyuubi\.engine\.share
\.level\.sub\.domain|
<undefined>
|
Allow end-users to create a sub-domain for the share level of an engine. A sub-domain is a case-insensitive string values in `^[a-zA-Z_]{1,10}$` form. For example, for `USER` share level, an end-user can share a certain engine within a sub-domain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level
|
string
|
1.2.0
+kyuubi\.engine\.share
\.level\.sub\.domain|
<undefined>
|
Allow end-users to create a sub-domain for the share level of an engine. A sub-domain is a case-insensitive string values in `^[a-zA-Z_-]{1,14}$` form. For example, for `USER` share level, an end-user can share a certain engine within a sub-domain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level
|
string
|
1.2.0
kyuubi\.engine\.single
\.spark\.session|
false
|
When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.
|
boolean
|
1.3.0
kyuubi\.engine\.ui\.stop
\.enabled|
true
|
When true, allows Kyuubi engine to be killed from the Spark Web UI.
|
boolean
|
1.3.0
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 6073f5fb1..7627ef25d 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -572,12 +572,12 @@ object KyuubiConf { .checkValues(ShareLevel.values.map(_.toString)) .createWithDefault(ShareLevel.USER.toString) - private val validEngineSubDomain: Pattern = "^[a-zA-Z_]{1,10}$".r.pattern + private val validEngineSubDomain: Pattern = "^[a-zA-Z_-]{1,14}$".r.pattern val ENGINE_SHARE_LEVEL_SUB_DOMAIN: OptionalConfigEntry[String] = buildConf("engine.share.level.sub.domain") .doc("Allow end-users to create a sub-domain for the share level of an engine. A" + - " sub-domain is a case-insensitive string values in `^[a-zA-Z_]{1,10}$` form." + + " sub-domain is a case-insensitive string values in `^[a-zA-Z_-]{1,14}$` form." + " For example, for `USER` share level, an end-user can share a certain engine within" + " a sub-domain, not for all of its clients. End-users are free to create multiple" + " engines in the `USER` share level") @@ -585,7 +585,7 @@ object KyuubiConf { .stringConf .transform(_.toLowerCase(Locale.ROOT)) .checkValue(validEngineSubDomain.matcher(_).matches(), - "must be [1, 10] length alphabet string, e.g. 'abc', 'apache'") + "must be [1, 14] length alphabet string, e.g. 'abc', 'apache'") .createOptional val ENGINE_CONNECTION_URL_USE_HOSTNAME: ConfigEntry[Boolean] = @@ -606,6 +606,23 @@ object KyuubiConf { .version("1.2.0") .fallbackConf(LEGACY_ENGINE_SHARE_LEVEL) + val ENGINE_POOL_SIZE_THRESHOLD: ConfigEntry[Int] = buildConf("engine.pool.size.threshold") + .doc("This parameter is introduced as a server-side parameter, " + + "and controls the upper limit of the engine pool.") + .version("1.4.0") + .intConf + .checkValue(s => s > 0 && s < 33, "Invalid engine pool threshold, it should be in [1, 32]") + .createWithDefault(9) + + val ENGINE_POOL_SIZE: ConfigEntry[Int] = buildConf("engine.pool.size") + .doc("The size of engine pool. Note that, " + + "if the size is less than 1, the engine pool will not be enabled; " + + "otherwise, the size of the engine pool will be " + + s"min(this, ${ENGINE_POOL_SIZE_THRESHOLD.key}).") + .version("1.4.0") + .intConf + .createWithDefault(-1) + val ENGINE_INITIALIZE_SQL: ConfigEntry[Seq[String]] = buildConf("engine.initialize.sql") .doc("SemiColon-separated list of SQL statements to be initialized in the newly created " + diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index ee34b2611..0eecccb5e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -19,6 +19,8 @@ package org.apache.kyuubi.engine import java.util.concurrent.TimeUnit +import scala.util.Random + import com.codahale.metrics.MetricRegistry import com.google.common.annotations.VisibleForTesting import org.apache.curator.framework.CuratorFramework @@ -27,7 +29,7 @@ import org.apache.curator.utils.ZKPaths import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUB_DOMAIN} +import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, SERVER, ShareLevel} import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_SESSION_ID @@ -56,7 +58,29 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI // Share level of the engine private val shareLevel: ShareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL)) - private val subDomain: Option[String] = conf.get(ENGINE_SHARE_LEVEL_SUB_DOMAIN) + // Server-side engine pool size threshold + private val poolThreshold: Int = conf.get(ENGINE_POOL_SIZE_THRESHOLD) + + @VisibleForTesting + private[kyuubi] val subDomain: Option[String] = conf.get(ENGINE_SHARE_LEVEL_SUB_DOMAIN).orElse { + val clientPoolSize: Int = conf.get(ENGINE_POOL_SIZE) + + if (clientPoolSize > 0) { + val poolSize = if (clientPoolSize <= poolThreshold) { + clientPoolSize + } else { + warn(s"Request engine pool size($clientPoolSize) exceeds, fallback to system threshold " + + s"$poolThreshold") + poolThreshold + } + + // TODO: Currently, we use random policy, and later we can add a sequential policy, + // such as AtomicInteger % poolSize. + Some("engine-pool-" + Random.nextInt(poolSize)) + } else { + None + } + } // Launcher of the engine private val appUser: String = shareLevel match { @@ -126,9 +150,8 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI } private def create(zkClient: CuratorFramework): (String, Int) = tryWithLock(zkClient) { - // TODO: improve this after support engine pool. (KYUUBI #918) - var engineRef = getServerHost(zkClient, engineSpace) // Get the engine address ahead if another process has succeeded + var engineRef = getServerHost(zkClient, engineSpace) if (engineRef.nonEmpty) return engineRef.get conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 536464acd..a3a0c5920 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -43,8 +43,10 @@ class KyuubiSessionImpl( sessionConf: KyuubiConf) extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) { + // TODO: needs improve the hardcode normalizedConf.foreach { case ("use:database", _) => + case ("kyuubi.engine.pool.size.threshold", _) => case (key, value) => sessionConf.set(key, value) } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala index 53a46d514..78d0127e8 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala @@ -23,6 +23,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.kyuubi.{KyuubiFunSuite, Utils} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.ha.HighAvailabilityConf import org.apache.kyuubi.ha.client.ZooKeeperClientProvider import org.apache.kyuubi.session.SessionHandle @@ -93,6 +94,34 @@ class EngineRefSuite extends KyuubiFunSuite { assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${user}_abc_${id.identifier}") } + test("check the engine space of engine pool") { + val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + + // test subdomain + conf.set(ENGINE_SHARE_LEVEL_SUB_DOMAIN, "abc") + val engine1 = EngineRef(conf, user, id) + assert(engine1.subDomain === Some("abc")) + + // unset domain + conf.unset(ENGINE_SHARE_LEVEL_SUB_DOMAIN) + val engine2 = EngineRef(conf, user, id) + assert(engine2.subDomain === None) + + // 1 <= engine pool size < threshold + conf.unset(ENGINE_SHARE_LEVEL_SUB_DOMAIN) + conf.set(ENGINE_POOL_SIZE, 3) + val engine3 = EngineRef(conf, user, id) + assert(engine3.subDomain.get.startsWith("engine-pool-")) + + // engine pool size > threshold + conf.unset(ENGINE_SHARE_LEVEL_SUB_DOMAIN) + conf.set(ENGINE_POOL_SIZE, 100) + val engine4 = EngineRef(conf, user, id) + val engineNumber = Integer.parseInt(engine4.subDomain.get.substring(12)) + val threshold = ENGINE_POOL_SIZE_THRESHOLD.defaultVal.get + assert(engineNumber <= threshold) + } + test("start and get engine address with lock") { val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala index a761add9d..57a45541b 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala @@ -72,7 +72,7 @@ class SparkSqlEngineSuite extends WithKyuubiServer with JDBCTestUtils { test("Fail connections on invalid sub domains") { - Seq("1", ",", "", "a" * 11, "abc.xyz").foreach { invalid => + Seq("1", ",", "", "a" * 15, "abc.xyz").foreach { invalid => val sparkHiveConfigs = Map( ENGINE_SHARE_LEVEL.key -> "USER", ENGINE_SHARE_LEVEL_SUB_DOMAIN.key -> invalid) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationEnginePoolSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationEnginePoolSuite.scala new file mode 100644 index 000000000..b121dfd15 --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationEnginePoolSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import org.scalatest.time.SpanSugar._ + +import org.apache.kyuubi.WithKyuubiServer +import org.apache.kyuubi.config.KyuubiConf + +class KyuubiOperationEnginePoolSuite extends WithKyuubiServer with JDBCTestUtils { + + override protected def jdbcUrl: String = getJdbcUrl + + override protected val conf: KyuubiConf = { + KyuubiConf() + } + + test("ensure app name contains engine-pool when engine pool is enabled.") { + withSessionConf()( + Map( + KyuubiConf.ENGINE_SHARE_LEVEL.key -> "user", + KyuubiConf.ENGINE_POOL_SIZE.key -> "2" + ))(Map.empty) { + + var r1: String = null + withJdbcStatement() { statement => + val res = statement.executeQuery("set spark.app.name") + assert(res.next()) + r1 = res.getString("value") + } + + eventually(timeout(120.seconds), interval(100.milliseconds)) { + assert(r1 != null) + } + + assert(r1.contains("engine-pool-")) + } + } + + test("ensure the sub-domain doesn't work with the CONNECTION share level.") { + withSessionConf()( + Map( + KyuubiConf.ENGINE_SHARE_LEVEL.key -> "connection", + KyuubiConf.ENGINE_POOL_SIZE.key -> "2" + ))(Map.empty) { + + var r1: String = null + withJdbcStatement() { statement => + val res = statement.executeQuery("set spark.app.name") + assert(res.next()) + r1 = res.getString("value") + } + + eventually(timeout(120.seconds), interval(100.milliseconds)) { + assert(r1 != null) + } + + assert(r1.contains("engine-pool-") === false) + } + } +} diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala index 32aa966be..4e6de55a0 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala @@ -55,4 +55,37 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with JDBCTests { assert(r1 === r2) } + + test("ensure two connections share the same engine when specifying subDomain.") { + withSessionConf()( + Map( + KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN.key -> "abc" + ))(Map.empty) { + + var r1: String = null + var r2: String = null + new Thread { + override def run(): Unit = withJdbcStatement() { statement => + val res = statement.executeQuery("set spark.app.name") + assert(res.next()) + r1 = res.getString("value") + } + }.start() + + new Thread { + override def run(): Unit = withJdbcStatement() { statement => + val res = statement.executeQuery("set spark.app.name") + assert(res.next()) + r2 = res.getString("value") + } + }.start() + + eventually(timeout(120.seconds), interval(100.milliseconds)) { + assert(r1 != null && r2 != null) + } + + assert(r1 === r2) + } + } + }