[KYUUBI #962] Support engine pool feature
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> For detail, please go to _Originally posted by yaooqinn in https://github.com/apache/incubator-kyuubi/issues/962#issuecomment-905116261_ ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request ``` ./bin/beeline -u 'jdbc:hive2://localhost:10009/;?kyuubi.engine.pool.enabled=true;kyuubi.engine.pool.size=3;' ``` Closes #986 from timothy65535/ky-962-v2. Closes #962 b79f8778 [timothy65535] fix test name 9a435607 [timothy65535] fix style abc746b8 [timothy65535] update tests 71b6933d [timothy65535] revert changes 018dee8b [timothy65535] revert changes e3492934 [timothy65535] revert changes 67fc8fd9 [timothy65535] improve threshold fa52acb2 [timothy65535] update pool threshold d8b5977d [timothy65535] remove dup get 7f39b8d7 [timothy65535] fix test error 4fdf8d76 [timothy65535] update thread pool size to 1 -> threshold 2477348a [timothy65535] rich engine pool tests 8eee427f [timothy65535] fix update error msg 104be5ea [timothy65535] fix error tests 8c2d1441 [timothy65535] fix doc ddd0eda9 [timothy65535] improve doc & tests 83d7c5cf [timothy65535] update doc 4b6c7ee3 [timothy65535] improve subdomain 169d9d06 [timothy65535] update todo 1424e565 [timothy65535] increase pool size range 3523cd6c [timothy65535] update todo 2cbeafb2 [timothy65535] remove enable option 424b5364 [timothy65535] revert ServiceDiscovery 6fc1c510 [timothy65535] remove random policy 9feef8f3 [timothy65535] [KYUUBI #962] Support engine pool feature Authored-by: timothy65535 <timothy65535@163.com> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
c59e54a675
commit
acb4f29906
@ -168,9 +168,11 @@ kyuubi\.engine<br>\.deregister\.job\.max<br>\.failures|<div style='width: 65pt;w
|
||||
kyuubi\.engine\.event<br>\.json\.log\.path|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>/tmp/kyuubi/events</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The location of all the engine events go for the builtin JSON logger.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
|
||||
kyuubi\.engine\.event<br>\.loggers|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of engine history loggers, where engine/session/operation etc events go.<ul> <li>SPARK: the events will be written to the spark history events</li> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done.</li></ul></div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.3.0</div>
|
||||
kyuubi\.engine<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
|
||||
kyuubi\.engine\.pool<br>\.size|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>-1</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The size of engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold).</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.engine\.pool<br>\.size\.threshold|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>9</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>This parameter is introduced as a server-side parameter, and controls the upper limit of the engine pool.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.engine\.session<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.3.0</div>
|
||||
kyuubi\.engine\.share<br>\.level|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>USER</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Engines will be shared in different levels, available configs are: <ul> <li>CONNECTION: engine will not be shared but only used by the current client connection</li> <li>USER: engine will be shared by all sessions created by a unique username, see also kyuubi.engine.share.level.sub.domain</li> <li>SERVER: the App will be shared by Kyuubi servers</li></ul></div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.2.0</div>
|
||||
kyuubi\.engine\.share<br>\.level\.sub\.domain|<div style='width: 65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Allow end-users to create a sub-domain for the share level of an engine. A sub-domain is a case-insensitive string values in `^[a-zA-Z_]{1,10}$` form. For example, for `USER` share level, an end-user can share a certain engine within a sub-domain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.2.0</div>
|
||||
kyuubi\.engine\.share<br>\.level\.sub\.domain|<div style='width: 65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Allow end-users to create a sub-domain for the share level of an engine. A sub-domain is a case-insensitive string values in `^[a-zA-Z_-]{1,14}$` form. For example, for `USER` share level, an end-user can share a certain engine within a sub-domain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.2.0</div>
|
||||
kyuubi\.engine\.single<br>\.spark\.session|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
|
||||
kyuubi\.engine\.ui\.stop<br>\.enabled|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, allows Kyuubi engine to be killed from the Spark Web UI.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
|
||||
|
||||
|
||||
@ -572,12 +572,12 @@ object KyuubiConf {
|
||||
.checkValues(ShareLevel.values.map(_.toString))
|
||||
.createWithDefault(ShareLevel.USER.toString)
|
||||
|
||||
private val validEngineSubDomain: Pattern = "^[a-zA-Z_]{1,10}$".r.pattern
|
||||
private val validEngineSubDomain: Pattern = "^[a-zA-Z_-]{1,14}$".r.pattern
|
||||
|
||||
val ENGINE_SHARE_LEVEL_SUB_DOMAIN: OptionalConfigEntry[String] =
|
||||
buildConf("engine.share.level.sub.domain")
|
||||
.doc("Allow end-users to create a sub-domain for the share level of an engine. A" +
|
||||
" sub-domain is a case-insensitive string values in `^[a-zA-Z_]{1,10}$` form." +
|
||||
" sub-domain is a case-insensitive string values in `^[a-zA-Z_-]{1,14}$` form." +
|
||||
" For example, for `USER` share level, an end-user can share a certain engine within" +
|
||||
" a sub-domain, not for all of its clients. End-users are free to create multiple" +
|
||||
" engines in the `USER` share level")
|
||||
@ -585,7 +585,7 @@ object KyuubiConf {
|
||||
.stringConf
|
||||
.transform(_.toLowerCase(Locale.ROOT))
|
||||
.checkValue(validEngineSubDomain.matcher(_).matches(),
|
||||
"must be [1, 10] length alphabet string, e.g. 'abc', 'apache'")
|
||||
"must be [1, 14] length alphabet string, e.g. 'abc', 'apache'")
|
||||
.createOptional
|
||||
|
||||
val ENGINE_CONNECTION_URL_USE_HOSTNAME: ConfigEntry[Boolean] =
|
||||
@ -606,6 +606,23 @@ object KyuubiConf {
|
||||
.version("1.2.0")
|
||||
.fallbackConf(LEGACY_ENGINE_SHARE_LEVEL)
|
||||
|
||||
val ENGINE_POOL_SIZE_THRESHOLD: ConfigEntry[Int] = buildConf("engine.pool.size.threshold")
|
||||
.doc("This parameter is introduced as a server-side parameter, " +
|
||||
"and controls the upper limit of the engine pool.")
|
||||
.version("1.4.0")
|
||||
.intConf
|
||||
.checkValue(s => s > 0 && s < 33, "Invalid engine pool threshold, it should be in [1, 32]")
|
||||
.createWithDefault(9)
|
||||
|
||||
val ENGINE_POOL_SIZE: ConfigEntry[Int] = buildConf("engine.pool.size")
|
||||
.doc("The size of engine pool. Note that, " +
|
||||
"if the size is less than 1, the engine pool will not be enabled; " +
|
||||
"otherwise, the size of the engine pool will be " +
|
||||
s"min(this, ${ENGINE_POOL_SIZE_THRESHOLD.key}).")
|
||||
.version("1.4.0")
|
||||
.intConf
|
||||
.createWithDefault(-1)
|
||||
|
||||
val ENGINE_INITIALIZE_SQL: ConfigEntry[Seq[String]] =
|
||||
buildConf("engine.initialize.sql")
|
||||
.doc("SemiColon-separated list of SQL statements to be initialized in the newly created " +
|
||||
|
||||
@ -19,6 +19,8 @@ package org.apache.kyuubi.engine
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import scala.util.Random
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
@ -27,7 +29,7 @@ import org.apache.curator.utils.ZKPaths
|
||||
|
||||
import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_SHARE_LEVEL, ENGINE_SHARE_LEVEL_SUB_DOMAIN}
|
||||
import org.apache.kyuubi.config.KyuubiConf._
|
||||
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, SERVER, ShareLevel}
|
||||
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_SESSION_ID
|
||||
@ -56,7 +58,29 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
|
||||
// Share level of the engine
|
||||
private val shareLevel: ShareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))
|
||||
|
||||
private val subDomain: Option[String] = conf.get(ENGINE_SHARE_LEVEL_SUB_DOMAIN)
|
||||
// Server-side engine pool size threshold
|
||||
private val poolThreshold: Int = conf.get(ENGINE_POOL_SIZE_THRESHOLD)
|
||||
|
||||
@VisibleForTesting
|
||||
private[kyuubi] val subDomain: Option[String] = conf.get(ENGINE_SHARE_LEVEL_SUB_DOMAIN).orElse {
|
||||
val clientPoolSize: Int = conf.get(ENGINE_POOL_SIZE)
|
||||
|
||||
if (clientPoolSize > 0) {
|
||||
val poolSize = if (clientPoolSize <= poolThreshold) {
|
||||
clientPoolSize
|
||||
} else {
|
||||
warn(s"Request engine pool size($clientPoolSize) exceeds, fallback to system threshold " +
|
||||
s"$poolThreshold")
|
||||
poolThreshold
|
||||
}
|
||||
|
||||
// TODO: Currently, we use random policy, and later we can add a sequential policy,
|
||||
// such as AtomicInteger % poolSize.
|
||||
Some("engine-pool-" + Random.nextInt(poolSize))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
// Launcher of the engine
|
||||
private val appUser: String = shareLevel match {
|
||||
@ -126,9 +150,8 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
|
||||
}
|
||||
|
||||
private def create(zkClient: CuratorFramework): (String, Int) = tryWithLock(zkClient) {
|
||||
// TODO: improve this after support engine pool. (KYUUBI #918)
|
||||
var engineRef = getServerHost(zkClient, engineSpace)
|
||||
// Get the engine address ahead if another process has succeeded
|
||||
var engineRef = getServerHost(zkClient, engineSpace)
|
||||
if (engineRef.nonEmpty) return engineRef.get
|
||||
|
||||
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
|
||||
|
||||
@ -43,8 +43,10 @@ class KyuubiSessionImpl(
|
||||
sessionConf: KyuubiConf)
|
||||
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
|
||||
|
||||
// TODO: needs improve the hardcode
|
||||
normalizedConf.foreach {
|
||||
case ("use:database", _) =>
|
||||
case ("kyuubi.engine.pool.size.threshold", _) =>
|
||||
case (key, value) => sessionConf.set(key, value)
|
||||
}
|
||||
|
||||
|
||||
@ -23,6 +23,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
|
||||
|
||||
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf._
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf
|
||||
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
|
||||
import org.apache.kyuubi.session.SessionHandle
|
||||
@ -93,6 +94,34 @@ class EngineRefSuite extends KyuubiFunSuite {
|
||||
assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${user}_abc_${id.identifier}")
|
||||
}
|
||||
|
||||
test("check the engine space of engine pool") {
|
||||
val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
|
||||
|
||||
// test subdomain
|
||||
conf.set(ENGINE_SHARE_LEVEL_SUB_DOMAIN, "abc")
|
||||
val engine1 = EngineRef(conf, user, id)
|
||||
assert(engine1.subDomain === Some("abc"))
|
||||
|
||||
// unset domain
|
||||
conf.unset(ENGINE_SHARE_LEVEL_SUB_DOMAIN)
|
||||
val engine2 = EngineRef(conf, user, id)
|
||||
assert(engine2.subDomain === None)
|
||||
|
||||
// 1 <= engine pool size < threshold
|
||||
conf.unset(ENGINE_SHARE_LEVEL_SUB_DOMAIN)
|
||||
conf.set(ENGINE_POOL_SIZE, 3)
|
||||
val engine3 = EngineRef(conf, user, id)
|
||||
assert(engine3.subDomain.get.startsWith("engine-pool-"))
|
||||
|
||||
// engine pool size > threshold
|
||||
conf.unset(ENGINE_SHARE_LEVEL_SUB_DOMAIN)
|
||||
conf.set(ENGINE_POOL_SIZE, 100)
|
||||
val engine4 = EngineRef(conf, user, id)
|
||||
val engineNumber = Integer.parseInt(engine4.subDomain.get.substring(12))
|
||||
val threshold = ENGINE_POOL_SIZE_THRESHOLD.defaultVal.get
|
||||
assert(engineNumber <= threshold)
|
||||
}
|
||||
|
||||
test("start and get engine address with lock") {
|
||||
val id = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
|
||||
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
|
||||
|
||||
@ -72,7 +72,7 @@ class SparkSqlEngineSuite extends WithKyuubiServer with JDBCTestUtils {
|
||||
|
||||
|
||||
test("Fail connections on invalid sub domains") {
|
||||
Seq("1", ",", "", "a" * 11, "abc.xyz").foreach { invalid =>
|
||||
Seq("1", ",", "", "a" * 15, "abc.xyz").foreach { invalid =>
|
||||
val sparkHiveConfigs = Map(
|
||||
ENGINE_SHARE_LEVEL.key -> "USER",
|
||||
ENGINE_SHARE_LEVEL_SUB_DOMAIN.key -> invalid)
|
||||
|
||||
@ -0,0 +1,76 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.operation
|
||||
|
||||
import org.scalatest.time.SpanSugar._
|
||||
|
||||
import org.apache.kyuubi.WithKyuubiServer
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
|
||||
class KyuubiOperationEnginePoolSuite extends WithKyuubiServer with JDBCTestUtils {
|
||||
|
||||
override protected def jdbcUrl: String = getJdbcUrl
|
||||
|
||||
override protected val conf: KyuubiConf = {
|
||||
KyuubiConf()
|
||||
}
|
||||
|
||||
test("ensure app name contains engine-pool when engine pool is enabled.") {
|
||||
withSessionConf()(
|
||||
Map(
|
||||
KyuubiConf.ENGINE_SHARE_LEVEL.key -> "user",
|
||||
KyuubiConf.ENGINE_POOL_SIZE.key -> "2"
|
||||
))(Map.empty) {
|
||||
|
||||
var r1: String = null
|
||||
withJdbcStatement() { statement =>
|
||||
val res = statement.executeQuery("set spark.app.name")
|
||||
assert(res.next())
|
||||
r1 = res.getString("value")
|
||||
}
|
||||
|
||||
eventually(timeout(120.seconds), interval(100.milliseconds)) {
|
||||
assert(r1 != null)
|
||||
}
|
||||
|
||||
assert(r1.contains("engine-pool-"))
|
||||
}
|
||||
}
|
||||
|
||||
test("ensure the sub-domain doesn't work with the CONNECTION share level.") {
|
||||
withSessionConf()(
|
||||
Map(
|
||||
KyuubiConf.ENGINE_SHARE_LEVEL.key -> "connection",
|
||||
KyuubiConf.ENGINE_POOL_SIZE.key -> "2"
|
||||
))(Map.empty) {
|
||||
|
||||
var r1: String = null
|
||||
withJdbcStatement() { statement =>
|
||||
val res = statement.executeQuery("set spark.app.name")
|
||||
assert(res.next())
|
||||
r1 = res.getString("value")
|
||||
}
|
||||
|
||||
eventually(timeout(120.seconds), interval(100.milliseconds)) {
|
||||
assert(r1 != null)
|
||||
}
|
||||
|
||||
assert(r1.contains("engine-pool-") === false)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -55,4 +55,37 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with JDBCTests {
|
||||
|
||||
assert(r1 === r2)
|
||||
}
|
||||
|
||||
test("ensure two connections share the same engine when specifying subDomain.") {
|
||||
withSessionConf()(
|
||||
Map(
|
||||
KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN.key -> "abc"
|
||||
))(Map.empty) {
|
||||
|
||||
var r1: String = null
|
||||
var r2: String = null
|
||||
new Thread {
|
||||
override def run(): Unit = withJdbcStatement() { statement =>
|
||||
val res = statement.executeQuery("set spark.app.name")
|
||||
assert(res.next())
|
||||
r1 = res.getString("value")
|
||||
}
|
||||
}.start()
|
||||
|
||||
new Thread {
|
||||
override def run(): Unit = withJdbcStatement() { statement =>
|
||||
val res = statement.executeQuery("set spark.app.name")
|
||||
assert(res.next())
|
||||
r2 = res.getString("value")
|
||||
}
|
||||
}.start()
|
||||
|
||||
eventually(timeout(120.seconds), interval(100.milliseconds)) {
|
||||
assert(r1 != null && r2 != null)
|
||||
}
|
||||
|
||||
assert(r1 === r2)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user