[KYUUBI #662]Fix run on k8s
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/NetEase/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> When run on k8s, engine write hostname to zk. But, server can not connect pod by hostname, we should use ip instead. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request Closes #663 from hddong/fix-k8s-connect. Closes #662 385ba6b [hongdongdong] fix test 370201d [hongdongdong] fix comments and add test 736261a [hongdongdong] [KYUUBI #662]Fix run on k8s Authored-by: hongdongdong <hongdongdong@cmss.chinamobile.com> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
5867893b32
commit
b75ea337f6
@ -138,6 +138,7 @@ kyuubi\.delegation<br>\.token\.renew\.interval|<div style='width: 65pt;word-wrap
|
||||
|
||||
Key | Default | Meaning | Type | Since
|
||||
--- | --- | --- | --- | ---
|
||||
kyuubi\.engine<br>\.connection\.url\.use<br>\.hostname|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, engine register with hostname to zookeeper. When spark run on k8s with cluster mode, set to false to ensure that server can connect to engine</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
|
||||
kyuubi\.engine<br>\.deregister\.exception<br>\.classes|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of exception classes. If there is any exception thrown, whose class matches the specified classes, the engine would deregister itself.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
|
||||
kyuubi\.engine<br>\.deregister\.exception<br>\.messages|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of exception messages. If there is any exception thrown, whose message or stacktrace matches the specified message list, the engine would deregister itself.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
|
||||
kyuubi\.engine<br>\.deregister\.exception<br>\.ttl|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.2.0</div>
|
||||
|
||||
@ -572,6 +572,14 @@ object KyuubiConf {
|
||||
"must be [1, 10] length alphabet string, e.g. 'abc', 'apache'")
|
||||
.createOptional
|
||||
|
||||
val ENGINE_CONNECTION_URL_USE_HOSTNAME: ConfigEntry[Boolean] =
|
||||
buildConf("engine.connection.url.use.hostname")
|
||||
.doc("When true, engine register with hostname to zookeeper. When spark run on k8s" +
|
||||
" with cluster mode, set to false to ensure that server can connect to engine")
|
||||
.version("1.3.0")
|
||||
.booleanConf
|
||||
.createWithDefault(true)
|
||||
|
||||
val ENGINE_SHARE_LEVEL: ConfigEntry[String] = buildConf("engine.share.level")
|
||||
.doc("Engines will be shared in different levels, available configs are: <ul>" +
|
||||
" <li>CONNECTION: engine will not be shared but only used by the current client" +
|
||||
|
||||
@ -104,10 +104,16 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
|
||||
super.initialize(conf)
|
||||
}
|
||||
|
||||
def connectionUrl: String = {
|
||||
def connectionUrl(server: Boolean = false): String = {
|
||||
getServiceState match {
|
||||
case s @ ServiceState.LATENT => throw new IllegalStateException(s"Illegal Service State: $s")
|
||||
case _ => s"${serverAddr.getCanonicalHostName}:$portNum"
|
||||
case _ =>
|
||||
if (server || conf.get(ENGINE_CONNECTION_URL_USE_HOSTNAME)) {
|
||||
s"${serverAddr.getCanonicalHostName}:$portNum"
|
||||
} else {
|
||||
// engine use address if run on k8s with cluster mode
|
||||
s"${serverAddr.getHostAddress}:$portNum"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -121,7 +127,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
|
||||
}
|
||||
|
||||
override def run(): Unit = try {
|
||||
info(s"Starting and exposing JDBC connection at: jdbc:hive2://$connectionUrl/")
|
||||
info(s"Starting and exposing JDBC connection at: jdbc:hive2://${connectionUrl(true)}/")
|
||||
server.foreach(_.serve())
|
||||
} catch {
|
||||
case _: InterruptedException => error(s"$getName is interrupted")
|
||||
|
||||
@ -31,7 +31,7 @@ abstract class Serverable(name: String) extends CompositeService(name) {
|
||||
protected def supportsServiceDiscovery: Boolean
|
||||
val discoveryService: Service
|
||||
|
||||
def connectionUrl: String = frontendService.connectionUrl
|
||||
def connectionUrl: String = frontendService.connectionUrl()
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = synchronized {
|
||||
this.conf = conf
|
||||
|
||||
@ -481,4 +481,13 @@ class FrontendServiceSuite extends KyuubiFunSuite {
|
||||
"Delegation token is not supported")
|
||||
}
|
||||
}
|
||||
|
||||
test("engine connect url use hostname") {
|
||||
// default use hostname
|
||||
assert(server.connectionUrl.startsWith("localhost"))
|
||||
|
||||
// use ip address
|
||||
conf.set(KyuubiConf.ENGINE_CONNECTION_URL_USE_HOSTNAME, false)
|
||||
assert(server.connectionUrl.startsWith("127.0.0.1"))
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user