From b75ea337f6f653d73d14a03931ea294750f5a85d Mon Sep 17 00:00:00 2001 From: hongdongdong Date: Thu, 3 Jun 2021 18:25:11 +0800 Subject: [PATCH] [KYUUBI #662]Fix run on k8s ### _Why are the changes needed?_ When run on k8s, engine write hostname to zk. But, server can not connect pod by hostname, we should use ip instead. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request Closes #663 from hddong/fix-k8s-connect. Closes #662 385ba6b [hongdongdong] fix test 370201d [hongdongdong] fix comments and add test 736261a [hongdongdong] [KYUUBI #662]Fix run on k8s Authored-by: hongdongdong Signed-off-by: Kent Yao --- docs/deployment/settings.md | 1 + .../scala/org/apache/kyuubi/config/KyuubiConf.scala | 8 ++++++++ .../org/apache/kyuubi/service/FrontendService.scala | 12 +++++++++--- .../scala/org/apache/kyuubi/service/Serverable.scala | 2 +- .../apache/kyuubi/service/FrontendServiceSuite.scala | 9 +++++++++ 5 files changed, 28 insertions(+), 4 deletions(-) diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index 34d33f863..f0089e0da 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -138,6 +138,7 @@ kyuubi\.delegation
\.token\.renew\.interval|
true
|
When true, engine register with hostname to zookeeper. When spark run on k8s with cluster mode, set to false to ensure that server can connect to engine
|
boolean
|
1.3.0
kyuubi\.engine
\.deregister\.exception
\.classes|
|
A comma separated list of exception classes. If there is any exception thrown, whose class matches the specified classes, the engine would deregister itself.
|
seq
|
1.2.0
kyuubi\.engine
\.deregister\.exception
\.messages|
|
A comma separated list of exception messages. If there is any exception thrown, whose message or stacktrace matches the specified message list, the engine would deregister itself.
|
seq
|
1.2.0
kyuubi\.engine
\.deregister\.exception
\.ttl|
PT30M
|
Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures.
|
duration
|
1.2.0
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index a6cafb0e5..5dade0af6 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -572,6 +572,14 @@ object KyuubiConf { "must be [1, 10] length alphabet string, e.g. 'abc', 'apache'") .createOptional + val ENGINE_CONNECTION_URL_USE_HOSTNAME: ConfigEntry[Boolean] = + buildConf("engine.connection.url.use.hostname") + .doc("When true, engine register with hostname to zookeeper. When spark run on k8s" + + " with cluster mode, set to false to ensure that server can connect to engine") + .version("1.3.0") + .booleanConf + .createWithDefault(true) + val ENGINE_SHARE_LEVEL: ConfigEntry[String] = buildConf("engine.share.level") .doc("Engines will be shared in different levels, available configs are:
    " + "
  • CONNECTION: engine will not be shared but only used by the current client" + diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala index a47be78c4..adba76504 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/FrontendService.scala @@ -104,10 +104,16 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab super.initialize(conf) } - def connectionUrl: String = { + def connectionUrl(server: Boolean = false): String = { getServiceState match { case s @ ServiceState.LATENT => throw new IllegalStateException(s"Illegal Service State: $s") - case _ => s"${serverAddr.getCanonicalHostName}:$portNum" + case _ => + if (server || conf.get(ENGINE_CONNECTION_URL_USE_HOSTNAME)) { + s"${serverAddr.getCanonicalHostName}:$portNum" + } else { + // engine use address if run on k8s with cluster mode + s"${serverAddr.getHostAddress}:$portNum" + } } } @@ -121,7 +127,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab } override def run(): Unit = try { - info(s"Starting and exposing JDBC connection at: jdbc:hive2://$connectionUrl/") + info(s"Starting and exposing JDBC connection at: jdbc:hive2://${connectionUrl(true)}/") server.foreach(_.serve()) } catch { case _: InterruptedException => error(s"$getName is interrupted") diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala index c0077db6f..4a432641e 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala @@ -31,7 +31,7 @@ abstract class Serverable(name: String) extends CompositeService(name) { protected def supportsServiceDiscovery: Boolean val discoveryService: Service - def connectionUrl: String = frontendService.connectionUrl + def connectionUrl: String = frontendService.connectionUrl() override def initialize(conf: KyuubiConf): Unit = synchronized { this.conf = conf diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/FrontendServiceSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/FrontendServiceSuite.scala index fa243eb7e..a79598d9c 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/FrontendServiceSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/FrontendServiceSuite.scala @@ -481,4 +481,13 @@ class FrontendServiceSuite extends KyuubiFunSuite { "Delegation token is not supported") } } + + test("engine connect url use hostname") { + // default use hostname + assert(server.connectionUrl.startsWith("localhost")) + + // use ip address + conf.set(KyuubiConf.ENGINE_CONNECTION_URL_USE_HOSTNAME, false) + assert(server.connectionUrl.startsWith("127.0.0.1")) + } }