[KYUUBI #2040] [TEST] Add Zookeeper 3.4/3.5/3.6/3.7 integration test

### _Why are the changes needed?_

This PR proposes to add Testcontainers based Zookeeper 3.4/3.5/3.6/3.7 integration test, we need it to verify the compatibility if we upgrade Zookeeper client to the new version. See details in #1941

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2040 from pan3793/zk-3-4-it.

Closes #2040

929540f6 [Cheng Pan] [TEST] Add Zookeeper 3.4/3.5/3.6 integration test

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2022-09-13 09:32:16 +08:00
parent 7c8d92c40a
commit c24a192dcb
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
11 changed files with 383 additions and 145 deletions

View File

@ -404,3 +404,37 @@ jobs:
path: |
**/target/unit-tests.log
**/kyuubi-spark-sql-engine.log*
zookeeper-it:
name: Zookeeper Integration Test
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
java: [ 8 ]
zookeeper: ["3.4", "3.5", "3.6", "3.7" ]
comment: [ "normal" ]
steps:
- uses: actions/checkout@v2
- name: Tune Runner VM
uses: ./.github/actions/tune-runner-vm
- name: Setup JDK ${{ matrix.java }}
uses: actions/setup-java@v2
with:
distribution: zulu
java-version: ${{ matrix.java }}
cache: 'maven'
check-latest: false
- name: zookeeper integration tests
run: |
export KYUUBI_IT_ZOOKEEPER_VERSION=${{ matrix.zookeeper }}
TEST_MODULES="integration-tests/kyuubi-zookeeper-it"
./build/mvn ${MVN_OPT} -pl ${TEST_MODULES} -am clean install -DskipTests
./build/mvn ${MVN_OPT} -pl ${TEST_MODULES} test
- name: Upload test logs
if: failure()
uses: actions/upload-artifact@v2
with:
name: unit-tests-log-java-${{ matrix.java }}-zookeeper-${{ matrix.comment }}
path: |
**/target/unit-tests.log

View File

@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>integration-tests</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kyuubi-zookeeper-it_2.12</artifactId>
<name>Kyuubi Test Zookeeper IT</name>
<url>https://kyuubi.apache.org/</url>
<dependencies>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-ha_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-ha_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-hive-jdbc-shaded</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.it.zookeeper
import com.dimafeng.testcontainers.{GenericContainer, SingleContainer}
import org.testcontainers.containers.wait.strategy.Wait
import org.apache.kyuubi.Utils
import org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClientSuite
class DockerizedZkServiceDiscoverySuite extends ZookeeperDiscoveryClientSuite {
private val zkClientPort = 2181
private val zkVersion = sys.env.getOrElse("KYUUBI_IT_ZOOKEEPER_VERSION", "3.4")
private val zkImage = sys.env.getOrElse("KYUUBI_IT_ZOOKEEPER_IMAGE", s"zookeeper:$zkVersion")
val container: SingleContainer[_] = GenericContainer(
dockerImage = zkImage,
exposedPorts = Seq(zkClientPort),
waitStrategy = Wait.forListeningPort)
override def getConnectString: String = s"${container.host}:${container.mappedPort(zkClientPort)}"
override def startZk(): Unit = synchronized {
container.start()
}
override def stopZk(): Unit = synchronized {
Utils.tryLogNonFatalError { container.stop() }
}
}

View File

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!-- Extra logging related to initialization of Log4j.
Set to debug or trace if log4j initialization is failing. -->
<Configuration status="WARN">
<Appenders>
<Console name="stdout" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %p %c: %m%n"/>
<Filters>
<ThresholdFilter level="FATAL"/>
<RegexFilter regex=".*Thrift error occurred during processing of message.*" onMatch="DENY" onMismatch="NEUTRAL"/>
</Filters>
</Console>
<File name="file" fileName="target/unit-tests.log">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n"/>
<Filters>
<RegexFilter regex=".*Thrift error occurred during processing of message.*" onMatch="DENY" onMismatch="NEUTRAL"/>
</Filters>
</File>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="stdout"/>
<AppenderRef ref="file"/>
</Root>
</Loggers>
</Configuration>

View File

@ -36,6 +36,7 @@
<module>kyuubi-hive-it</module>
<module>kyuubi-trino-it</module>
<module>kyuubi-jdbc-it</module>
<module>kyuubi-zookeeper-it</module>
</modules>
<profiles>

View File

@ -126,5 +126,21 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>prepare-test-jar</id>
<phase>test-compile</phase>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -174,13 +174,13 @@ object DiscoveryClient {
private[client] def parseInstanceHostPort(instance: String): (String, Int) = {
val maybeInfos = instance.split(";")
.map(_.split("=", 2))
.filter(_.size == 2)
.filter(_.length == 2)
.map(i => (i(0), i(1)))
.toMap
if (maybeInfos.size > 0) {
if (maybeInfos.nonEmpty) {
(
maybeInfos.get("hive.server2.thrift.bind.host").get,
maybeInfos.get("hive.server2.thrift.port").get.toInt)
maybeInfos("hive.server2.thrift.bind.host"),
maybeInfos("hive.server2.thrift.port").toInt)
} else {
val strings = instance.split(":")
(strings(0), strings(1).toInt)

View File

@ -18,16 +18,23 @@
package org.apache.kyuubi.ha.client
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
class DiscoveryClientProviderSuite extends KyuubiFunSuite {
test("discovery") {
val conf = KyuubiConf()
DiscoveryClientProvider.withDiscoveryClient(conf) { discoveryClient =>
discoveryClient.getServerHost("/kyuubi")
}
DiscoveryClientProvider.withDiscoveryClient(conf) { discoveryClient =>
discoveryClient.getServerHost("/kyuubi")
}
trait DiscoveryClientSuite extends KyuubiFunSuite {
test("parse host and port from instance string") {
val host = "127.0.0.1"
val port = 10009
val instance1 = s"$host:$port"
val (host1, port1) = DiscoveryClient.parseInstanceHostPort(instance1)
assert(host === host1)
assert(port === port1)
val instance2 = s"hive.server2.thrift.sasl.qop=auth;hive.server2.thrift.bind.host=$host;" +
s"hive.server2.transport.mode=binary;hive.server2.authentication=KERBEROS;" +
s"hive.server2.thrift.port=$port;" +
s"hive.server2.authentication.kerberos.principal=test/_HOST@apache.org"
val (host2, port2) = DiscoveryClient.parseInstanceHostPort(instance2)
assert(host === host2)
assert(port === port2)
}
}

View File

@ -17,141 +17,137 @@
package org.apache.kyuubi.ha.client
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.KYUUBI_VERSION
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, KyuubiSQLException}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_AUTH_TYPE
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_NAMESPACE}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
import org.apache.kyuubi.service.NoopTBinaryFrontendServer
import org.apache.kyuubi.service.NoopTBinaryFrontendService
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.service.Service
import org.apache.kyuubi.service.ServiceState
import org.apache.kyuubi.service._
trait DiscoveryClientTests extends KyuubiFunSuite {
protected val conf: KyuubiConf
protected def conf: KyuubiConf
protected def getConnectString: String
test("publish instance to embedded zookeeper server") {
test("publish instance to discovery service") {
val namespace = "kyuubiserver"
conf
.unset(KyuubiConf.SERVER_KEYTAB)
.unset(KyuubiConf.SERVER_PRINCIPAL)
.set(HA_ADDRESSES, getConnectString)
.set(HA_NAMESPACE, namespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
var serviceDiscovery: KyuubiServiceDiscovery = null
val server: Serverable = new NoopTBinaryFrontendServer() {
var discovery: ServiceDiscovery = null
val service: Serverable = new NoopTBinaryFrontendServer() {
override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
new NoopTBinaryFrontendService(this) {
override val discoveryService: Option[Service] = {
serviceDiscovery = new KyuubiServiceDiscovery(this)
Some(serviceDiscovery)
discovery = new KyuubiServiceDiscovery(this)
Some(discovery)
}
})
}
server.initialize(conf)
server.start()
val znodeRoot = s"/$namespace"
withDiscoveryClient(conf) { framework =>
try {
assert(framework.pathNonExists("/abc"))
assert(framework.pathExists(znodeRoot))
val children = framework.getChildren(znodeRoot)
service.initialize(conf)
service.start()
val basePath = s"/$namespace"
try {
withDiscoveryClient(conf) { discoveryClient =>
assert(discoveryClient.pathNonExists("/abc"))
assert(discoveryClient.pathExists(basePath))
val children = discoveryClient.getChildren(basePath)
assert(children.head ===
s"serviceUri=${server.frontendServices.head.connectionUrl};" +
s"serviceUri=${service.frontendServices.head.connectionUrl};" +
s"version=$KYUUBI_VERSION;sequence=0000000000")
children.foreach { child =>
framework.delete(s"""$znodeRoot/$child""")
discoveryClient.delete(s"$basePath/$child")
}
eventually(timeout(5.seconds), interval(100.millis)) {
assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
assert(server.getServiceState === ServiceState.STOPPED)
assert(discovery.getServiceState === ServiceState.STOPPED)
assert(service.getServiceState === ServiceState.STOPPED)
}
} finally {
server.stop()
}
} finally {
service.stop()
discovery.stop()
}
}
test("KYUUBI-304: Stop engine service gracefully when related zk node is deleted") {
test("KYUUBI #304: Stop engine service gracefully when related node is deleted") {
val logAppender = new LogAppender("test stop engine gracefully")
withLogAppender(logAppender) {
val namespace = "kyuubiengine"
conf
.unset(KyuubiConf.SERVER_KEYTAB)
.unset(KyuubiConf.SERVER_PRINCIPAL)
.set(HA_ADDRESSES, getConnectString)
.set(HA_NAMESPACE, namespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
.set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString)
var serviceDiscovery: KyuubiServiceDiscovery = null
val server: Serverable = new NoopTBinaryFrontendServer() {
var discovery: ServiceDiscovery = null
val service: Serverable = new NoopTBinaryFrontendServer() {
override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
new NoopTBinaryFrontendService(this) {
override val discoveryService: Option[Service] = {
serviceDiscovery = new KyuubiServiceDiscovery(this)
Some(serviceDiscovery)
discovery = new KyuubiServiceDiscovery(this)
Some(discovery)
}
})
}
server.initialize(conf)
server.start()
service.initialize(conf)
service.start()
val znodeRoot = s"/$namespace"
withDiscoveryClient(conf) { framework =>
try {
assert(framework.pathNonExists("/abc"))
assert(framework.pathExists(znodeRoot))
val children = framework.getChildren(znodeRoot)
val basePath = s"/$namespace"
try {
withDiscoveryClient(conf) { discoveryClient =>
assert(discoveryClient.pathNonExists("/abc"))
assert(discoveryClient.pathExists(basePath))
val children = discoveryClient.getChildren(basePath)
assert(children.head ===
s"serviceUri=${server.frontendServices.head.connectionUrl};" +
s"serviceUri=${service.frontendServices.head.connectionUrl};" +
s"version=$KYUUBI_VERSION;sequence=0000000000")
children.foreach { child =>
framework.delete(s"""$znodeRoot/$child""")
discoveryClient.delete(s"""$basePath/$child""")
}
eventually(timeout(5.seconds), interval(100.millis)) {
assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
assert(server.getServiceState === ServiceState.STOPPED)
val msg = s"This Kyuubi instance ${server.frontendServices.head.connectionUrl}" +
assert(discovery.getServiceState === ServiceState.STOPPED)
assert(service.getServiceState === ServiceState.STOPPED)
val msg = s"This Kyuubi instance ${service.frontendServices.head.connectionUrl}" +
s" is now de-registered"
assert(logAppender.loggingEvents.exists(
_.getMessage.getFormattedMessage.contains(msg)))
}
} finally {
server.stop()
serviceDiscovery.stop()
}
} finally {
service.stop()
discovery.stop()
}
}
}
test("parse host and port from instance string") {
val host = "127.0.0.1"
val port = 10009
val instance1 = s"$host:$port"
val (host1, port1) = DiscoveryClient.parseInstanceHostPort(instance1)
assert(host === host1)
assert(port === port1)
test("distribute lock") {
val lockPath = "/lock-test"
val lockLatch = new CountDownLatch(1)
val instance2 = s"hive.server2.thrift.sasl.qop=auth;hive.server2.thrift.bind.host=$host;" +
s"hive.server2.transport.mode=binary;hive.server2.authentication=KERBEROS;" +
s"hive.server2.thrift.port=$port;" +
s"hive.server2.authentication.kerberos.principal=test/_HOST@apache.org"
val (host2, port2) = DiscoveryClient.parseInstanceHostPort(instance2)
assert(host === host2)
assert(port === port2)
new Thread(() => {
withDiscoveryClient(conf) { discoveryClient =>
discoveryClient.tryWithLock(lockPath, 3000) {
lockLatch.countDown()
Thread.sleep(5000)
}
}
}).start()
withDiscoveryClient(conf) { discoveryClient =>
assert(lockLatch.await(5000, TimeUnit.MILLISECONDS))
val e = intercept[KyuubiSQLException] {
discoveryClient.tryWithLock(lockPath, 2000) {}
}
assert(e.getMessage contains s"Timeout to lock on path [$lockPath]")
}
}
}

View File

@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import io.etcd.jetcd.launcher.{Etcd, EtcdCluster}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_CLIENT_CLASS
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_CLIENT_CLASS}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
import org.apache.kyuubi.ha.client.DiscoveryClientTests
import org.apache.kyuubi.service.NoopTBinaryFrontendServer
@ -38,24 +38,26 @@ class EtcdDiscoveryClientSuite extends DiscoveryClientTests {
override def getConnectString: String = _connectString
val conf: KyuubiConf = {
KyuubiConf()
.set(HA_CLIENT_CLASS, classOf[EtcdDiscoveryClient].getName)
}
var conf: KyuubiConf = KyuubiConf()
.set(HA_CLIENT_CLASS, classOf[EtcdDiscoveryClient].getName)
override def beforeAll(): Unit = {
etcdCluster = new Etcd.Builder()
.withNodes(2)
.build()
etcdCluster.start()
conf = new KyuubiConf()
.set(HA_CLIENT_CLASS, classOf[EtcdDiscoveryClient].getName)
.set(HA_ADDRESSES, getConnectString)
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
if (etcdCluster != null) {
etcdCluster.close()
etcdCluster = null
}
super.afterAll()
}
test("etcd test: set, get and delete") {

View File

@ -33,32 +33,48 @@ import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.KerberizedTestHelper
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.ha.client.DiscoveryClientTests
import org.apache.kyuubi.ha.client.EngineServiceDiscovery
import org.apache.kyuubi.ha.client._
import org.apache.kyuubi.service._
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
import org.apache.kyuubi.zookeeper.ZookeeperConf.ZK_CLIENT_PORT
class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests with KerberizedTestHelper {
class EmbeddedZookeeperDiscoveryClientSuite extends ZookeeperDiscoveryClientSuite {
val zkServer = new EmbeddedZookeeper()
override val conf: KyuubiConf = KyuubiConf()
private var _zkServer: EmbeddedZookeeper = _
override def getConnectString: String = zkServer.getConnectString
override def getConnectString: String = _zkServer.getConnectString
override def beforeAll(): Unit = {
conf.set(ZookeeperConf.ZK_CLIENT_PORT, 0)
zkServer.initialize(conf)
zkServer.start()
super.beforeAll()
override def startZk(): Unit = {
val embeddedZkConf = KyuubiConf()
embeddedZkConf.set(ZK_CLIENT_PORT, 0)
_zkServer = new EmbeddedZookeeper()
_zkServer.initialize(embeddedZkConf)
_zkServer.start()
}
override def afterAll(): Unit = {
conf.unset(KyuubiConf.SERVER_KEYTAB)
conf.unset(KyuubiConf.SERVER_PRINCIPAL)
conf.unset(HA_ADDRESSES)
zkServer.stop()
super.afterAll()
override def stopZk(): Unit = {
_zkServer.stop()
}
}
abstract class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests
with KerberizedTestHelper {
var conf: KyuubiConf = KyuubiConf()
def startZk(): Unit
def stopZk(): Unit
override def beforeEach(): Unit = {
startZk()
conf = new KyuubiConf().set(HA_ADDRESSES, getConnectString)
super.beforeEach()
}
override def afterEach(): Unit = {
super.afterEach()
stopZk()
}
test("acl for zookeeper") {
@ -116,43 +132,33 @@ class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests with Kerberized
}
test("stop engine in time while zk ensemble terminates") {
val zkServer = new EmbeddedZookeeper()
val conf = KyuubiConf()
.set(ZookeeperConf.ZK_CLIENT_PORT, 0)
try {
zkServer.initialize(conf)
zkServer.start()
var serviceDiscovery: EngineServiceDiscovery = null
val server = new NoopTBinaryFrontendServer() {
override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
new NoopTBinaryFrontendService(this) {
override val discoveryService: Option[Service] = {
serviceDiscovery = new EngineServiceDiscovery(this)
Some(serviceDiscovery)
}
})
}
conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
.set(HA_ZK_CONN_BASE_RETRY_WAIT, 1)
.set(HA_ADDRESSES, zkServer.getConnectString)
.set(HA_ZK_SESSION_TIMEOUT, 2000)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
server.initialize(conf)
server.start()
assert(server.getServiceState === ServiceState.STARTED)
var discovery: ServiceDiscovery = null
val service = new NoopTBinaryFrontendServer() {
override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
new NoopTBinaryFrontendService(this) {
override val discoveryService: Option[Service] = {
discovery = new EngineServiceDiscovery(this)
Some(discovery)
}
})
}
conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
.set(HA_ZK_CONN_BASE_RETRY_WAIT, 1)
.set(HA_ZK_SESSION_TIMEOUT, 2000)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
service.initialize(conf)
service.start()
assert(service.getServiceState === ServiceState.STARTED)
zkServer.stop()
val isServerLostM = serviceDiscovery.getClass.getSuperclass.getDeclaredField("isServerLost")
isServerLostM.setAccessible(true)
val isServerLost = isServerLostM.get(serviceDiscovery)
stopZk()
val isServerLostM = discovery.getClass.getSuperclass.getDeclaredField("isServerLost")
isServerLostM.setAccessible(true)
val isServerLost = isServerLostM.get(discovery)
eventually(timeout(10.seconds), interval(100.millis)) {
assert(isServerLost.asInstanceOf[AtomicBoolean].get())
assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
assert(server.getServiceState === ServiceState.STOPPED)
}
} finally {
zkServer.stop()
eventually(timeout(10.seconds), interval(100.millis)) {
assert(isServerLost.asInstanceOf[AtomicBoolean].get())
assert(discovery.getServiceState === ServiceState.STOPPED)
assert(service.getServiceState === ServiceState.STOPPED)
}
}
}