[KYUUBI #386] Clean up discovery service before engine stop in connection share level

![ulysses-you](https://badgen.net/badge/Hello/ulysses-you/green) [![Closes #387](https://badgen.net/badge/Preview/Closes%20%23387/blue)](https://github.com/yaooqinn/kyuubi/pull/387) ![330](https://badgen.net/badge/%2B/330/red) ![52](https://badgen.net/badge/-/52/green) ![18](https://badgen.net/badge/commits/18/yellow) ![Test Plan](https://badgen.net/badge/Missing/Test%20Plan/ff0000) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
In connection level, we will release engine if the session connection is closed but we don't clean up the discovery service (i.e. the namespace in zookeeper). That may cause the disk stress after running a long time.

### _How was this patch tested?_
Add some new suites in spark engine moudle with zk emmbedded.

Closes #387 from ulysses-you/issue-386.

ccb1112 [ulysses-you] move method
1558dd9 [ulysses-you] test
6cd7e57 [ulysses-you] nit
f8559d2 [ulysses-you] check level at engine discovery
eedfaee [ulysses-you] split ServiceDiscovery to server and engine
8b20e6c [ulysses-you] timeout
0cf524c [ulysses-you] remove unnecessary test
41d36f9 [ulysses-you] move exception
eaaa12d [ulysses-you] avoid stop twice
b801c14 [ulysses-you] fix test
db39960 [ulysses-you] add start check
2d1f6dd [ulysses-you] add state check
e7374aa [ulysses-you] nit
ddf383c [ulysses-you] fi
ee088be [ulysses-you] nit
3c2013b [ulysses-you] nit
73b386f [ulysses-you] improve conf of test
2185f49 [ulysses-you] init

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
ulysses-you 2021-03-04 11:31:41 +08:00 committed by Kent Yao
parent 8e8679e92b
commit aa878e4d48
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
14 changed files with 332 additions and 54 deletions

View File

@ -90,6 +90,12 @@
<artifactId>${iceberg.name}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -27,7 +27,7 @@ import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{RetryPolicies, ServiceDiscovery}
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies, ServiceDiscovery}
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.SignalRegister
@ -37,7 +37,7 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
def this(spark: SparkSession) = this(classOf[SparkSQLEngine].getSimpleName, spark)
override private[kyuubi] val backendService = new SparkSQLBackendService(spark)
private val discoveryService = new ServiceDiscovery(this)
private val discoveryService = new EngineServiceDiscovery(this)
override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark
import java.util.UUID
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARED_LEVEL
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel
import org.apache.kyuubi.operation.JDBCTestUtils
import org.apache.kyuubi.service.ServiceState
/**
* This suite is to test some behaivor with spark engine in different share level.
* e.g. cleanup discovery service before stop.
*/
abstract class ShareLevelSparkEngineSuite
extends WithDiscoverySparkSQLEngine with JDBCTestUtils {
def sharedLevel: ShareLevel
override def withKyuubiConf: Map[String, String] = {
super.withKyuubiConf ++ Map(ENGINE_SHARED_LEVEL.key-> sharedLevel.toString)
}
override protected def jdbcUrl: String = getJdbcUrl
override val namespace: String = {
// for test, we always use uuid as namespace
s"/kyuubi/${sharedLevel.toString}/${UUID.randomUUID().toString}"
}
test("check discovery service is clean up with different share level") {
withZkClient { zkClient =>
assert(engine.getServiceState == ServiceState.STARTED)
assert(zkClient.checkExists().forPath(namespace) != null)
withJdbcStatement() {_ => }
sharedLevel match {
// Connection level, we will cleanup namespace since it's always a global unique value.
case ShareLevel.CONNECTION =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(zkClient.checkExists().forPath(namespace) == null)
case _ =>
assert(engine.getServiceState == ServiceState.STARTED)
assert(zkClient.checkExists().forPath(namespace) != null)
}
}
}
}
class ConnectionLevelSparkEngineSuite extends ShareLevelSparkEngineSuite {
override def sharedLevel: ShareLevel = ShareLevel.CONNECTION
}
class UserLevelSparkEngineSuite extends ShareLevelSparkEngineSuite {
override def sharedLevel: ShareLevel = ShareLevel.USER
}

View File

@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark
import org.apache.kyuubi.service.ServiceState
class SparkSQLEngineListenerSuite extends WithSparkSQLEngine {
override def conf: Map[String, String] = Map.empty
override def withKyuubiConf: Map[String, String] = Map.empty
test("application end") {
assert(engine.getServiceState === ServiceState.STARTED)

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark
import org.apache.curator.framework.CuratorFramework
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{EMBEDDED_ZK_PORT, EMBEDDED_ZK_TEMP_DIR}
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_ACL_ENABLED, HA_ZK_NAMESPACE, HA_ZK_QUORUM}
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.ha.server.EmbeddedZkServer
trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
private var zkServer: EmbeddedZkServer = _
def namespace: String
override def withKyuubiConf: Map[String, String] = {
assert(zkServer != null)
Map(HA_ZK_QUORUM.key -> zkServer.getConnectString,
HA_ZK_ACL_ENABLED.key -> "false",
HA_ZK_NAMESPACE.key -> namespace)
}
override def beforeAll(): Unit = {
zkServer = new EmbeddedZkServer()
val zkData = Utils.createTempDir()
val tmpConf = KyuubiConf()
tmpConf.set(EMBEDDED_ZK_PORT, -1)
tmpConf.set(EMBEDDED_ZK_TEMP_DIR, zkData.toString)
zkServer.initialize(tmpConf)
zkServer.start()
}
override def afterAll(): Unit = {
if (zkServer != null) {
zkServer.stop()
}
}
override protected def beforeEach(): Unit = {
super.beforeEach()
startSparkEngine()
}
override protected def afterEach(): Unit = {
super.afterEach()
stopSparkEngine()
}
def withZkClient(f: CuratorFramework => Unit): Unit = {
val zkClient = ServiceDiscovery.startZookeeperClient(kyuubiConf)
try {
f(zkClient)
} finally {
zkClient.close()
}
}
protected def getDiscoveryConnectionString: String = {
if (zkServer == null) {
""
} else {
zkServer.getConnectString
}
}
}

View File

@ -20,11 +20,14 @@ package org.apache.kyuubi.engine.spark
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.config.KyuubiConf
trait WithSparkSQLEngine extends KyuubiFunSuite {
protected var spark: SparkSession = _
protected var engine: SparkSQLEngine = _
def conf: Map[String, String]
// conf will be loaded until start spark engine
def withKyuubiConf: Map[String, String]
val kyuubiConf: KyuubiConf = SparkSQLEngine.kyuubiConf
protected var connectionUrl: String = _
@ -42,9 +45,9 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
s"jdbc:derby:;databaseName=$metastorePath;create=true")
System.setProperty("spark.sql.warehouse.dir", warehousePath.toString)
System.setProperty("spark.sql.hive.metastore.sharedPrefixes", "org.apache.hive.jdbc")
conf.foreach { case (k, v) =>
withKyuubiConf.foreach { case (k, v) =>
System.setProperty(k, v)
SparkSQLEngine.kyuubiConf.set(k, v)
kyuubiConf.set(k, v)
}
SparkSession.clearActiveSession()
@ -56,10 +59,17 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
override def afterAll(): Unit = {
super.afterAll()
stopSparkEngine()
}
protected def stopSparkEngine(): Unit = {
// we need to clean up conf since it's the global config in same jvm.
withKyuubiConf.foreach { case (k, _) =>
System.clearProperty(k)
kyuubiConf.unset(k)
}
if (engine != null) {
engine.stop()
engine = null

View File

@ -22,7 +22,7 @@ import org.apache.kyuubi.operation.BasicIcebergJDBCTests
class SparkIcebergOperationSuite extends WithSparkSQLEngine with BasicIcebergJDBCTests {
override protected def jdbcUrl: String = getJdbcUrl
override def conf: Map[String, String] = icebergConfigs
override def withKyuubiConf: Map[String, String] = icebergConfigs
override def afterAll(): Unit = {
super.afterAll()

View File

@ -39,7 +39,7 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
class SparkOperationSuite extends WithSparkSQLEngine with JDBCTests {
override protected def jdbcUrl: String = getJdbcUrl
override def conf: Map[String, String] = Map.empty
override def withKyuubiConf: Map[String, String] = Map.empty
test("get table types") {
withJdbcStatement() { statement =>

View File

@ -27,7 +27,7 @@ import org.apache.kyuubi.operation.JDBCTestUtils
import org.apache.kyuubi.service.ServiceState._
class SessionSuite extends WithSparkSQLEngine with JDBCTestUtils {
override def conf: Map[String, String] = {
override def withKyuubiConf: Map[String, String] = {
Map(ENGINE_SHARED_LEVEL.key -> "CONNECTION")
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.ha.client
import scala.util.control.NonFatal
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARED_LEVEL
import org.apache.kyuubi.service.Serverable
/**
* A service for service discovery used by engine side.
*
* @param name the name of the service itself
* @param server the instance uri a service that used to publish itself
*/
class EngineServiceDiscovery private(
name: String,
server: Serverable) extends ServiceDiscovery(server) {
def this(server: Serverable) =
this(classOf[EngineServiceDiscovery].getSimpleName, server)
override def stop(): Unit = {
conf.get(ENGINE_SHARED_LEVEL) match {
// For connection level, we should clean up the namespace in zk in case the disk stress.
case "CONNECTION" if namespace != null =>
try {
zkClient.delete().deletingChildrenIfNeeded().forPath(namespace)
info("Clean up discovery service due to this is connection share level.")
} catch {
case NonFatal(e) =>
warn("Failed to clean up Spark engine before stop.", e)
}
case _ =>
}
super.stop()
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.ha.client
import org.apache.zookeeper.{WatchedEvent, Watcher}
import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.service.Serverable
/**
* A service for service discovery used by kyuubi server side.
* We add another zk watch so that we can stop server more genteelly.
*
* @param name the name of the service itself
* @param server the instance uri a service that used to publish itself
*/
class KyuubiServiceDiscovery private(
name: String,
server: Serverable) extends ServiceDiscovery(server) {
def this(server: Serverable) =
this(classOf[KyuubiServiceDiscovery].getSimpleName, server)
override def start(): Unit = {
super.start()
// Set a watch on the serviceNode
val watcher = new DeRegisterWatcher
if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) {
// No node exists, throw exception
throw new KyuubiException(s"Unable to create znode for this Kyuubi " +
s"instance[${server.connectionUrl}] on ZooKeeper.")
}
}
override def stopGracefully(): Unit = {
stop()
while (server.backendService != null &&
server.backendService.sessionManager.getOpenSessionCount > 0) {
Thread.sleep(1000 * 60)
}
server.stop()
}
class DeRegisterWatcher extends Watcher {
override def process(event: WatchedEvent): Unit = {
if (event.getType == Watcher.Event.EventType.NodeDeleted) {
warn(s"This Kyuubi instance ${server.connectionUrl} is now de-registered from" +
s" ZooKeeper. The server will be shut down after the last client session completes.")
stopGracefully()
}
}
}
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import javax.security.auth.login.Configuration
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode
@ -33,8 +34,8 @@ import org.apache.curator.retry._
import org.apache.curator.utils.ZKPaths
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.JaasConfiguration
import org.apache.zookeeper.{KeeperException, WatchedEvent, Watcher}
import org.apache.zookeeper.CreateMode.PERSISTENT
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.NodeExistsException
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException}
@ -45,34 +46,37 @@ import org.apache.kyuubi.service.{AbstractService, Serverable}
import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
/**
* A service for service discovery
* A abstract service for service discovery
*
* @param name the name of the service itself
* @param server the instance uri a service that used to publish itself
*/
class ServiceDiscovery private (
abstract class ServiceDiscovery private (
name: String,
server: Serverable) extends AbstractService(name) {
def this(server: Serverable) =
this(classOf[ServiceDiscovery].getSimpleName, server)
private var zkClient: CuratorFramework = _
private var serviceNode: PersistentEphemeralNode = _
private var _zkClient: CuratorFramework = _
private var _serviceNode: PersistentEphemeralNode = _
/**
* a pre-defined namespace used to publish the instance of the associate service
*/
private var namespace: String = _
private var _namespace: String = _
def zkClient: CuratorFramework = _zkClient
def serviceNode: PersistentEphemeralNode = _serviceNode
def namespace: String = _namespace
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
namespace = conf.get(HA_ZK_NAMESPACE)
_namespace = conf.get(HA_ZK_NAMESPACE)
val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
setUpZooKeeperAuth(conf)
zkClient = buildZookeeperClient(conf)
_zkClient = buildZookeeperClient(conf)
zkClient.getConnectionStateListenable.addListener(new ConnectionStateListener {
private val isConnected = new AtomicBoolean(false)
override def stateChanged(client: CuratorFramework, newState: ConnectionState): Unit = {
@ -86,7 +90,6 @@ class ServiceDiscovery private (
override def run(): Unit = if (!isConnected.get()) {
error(s"Zookeeper client connection state changed to: $newState, but failed to" +
s" reconnect in ${delay / 1000} seconds. Give up retry. ")
client.close()
stopGracefully()
}
}, delay, TimeUnit.MILLISECONDS)
@ -116,7 +119,7 @@ class ServiceDiscovery private (
namespace,
s"serviceUri=$instance;version=$KYUUBI_VERSION;sequence=")
try {
serviceNode = new PersistentEphemeralNode(
_serviceNode = new PersistentEphemeralNode(
zkClient,
PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL,
pathPrefix,
@ -126,13 +129,6 @@ class ServiceDiscovery private (
if (!serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit.SECONDS)) {
throw new KyuubiException(s"Max znode creation wait time $znodeTimeout s exhausted")
}
// Set a watch on the serviceNode
val watcher = new DeRegisterWatcher
if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) {
// No node exists, throw exception
throw new KyuubiException(s"Unable to create znode for this Kyuubi instance[$instance]" +
s" on ZooKeeper.")
}
info(s"Created a ${serviceNode.getActualPath} on ZooKeeper for KyuubiServer uri: " + instance)
} catch {
case e: Exception =>
@ -146,38 +142,30 @@ class ServiceDiscovery private (
}
override def stop(): Unit = {
if (serviceNode != null) {
try {
serviceNode.close()
} catch {
case e: IOException =>
error("Failed to close the persistent ephemeral znode" + serviceNode.getActualPath, e)
}
}
closeServiceNode()
if (zkClient != null) zkClient.close()
super.stop()
}
private def stopGracefully(): Unit = {
stop()
while (server.backendService != null &&
server.backendService.sessionManager.getOpenSessionCount > 0) {
Thread.sleep(1000 * 60)
}
server.stop()
}
class DeRegisterWatcher extends Watcher {
override def process(event: WatchedEvent): Unit = {
if (event.getType == Watcher.Event.EventType.NodeDeleted) {
warn(s"This Kyuubi instance ${server.connectionUrl} is now de-registered from" +
s" ZooKeeper. The server will be shut down after the last client session completes.")
ServiceDiscovery.this.stopGracefully()
// close the EPHEMERAL_SEQUENTIAL node in zk
protected def closeServiceNode(): Unit = {
if (_serviceNode != null) {
try {
_serviceNode.close()
} catch {
case e: IOException =>
error("Failed to close the persistent ephemeral znode" + serviceNode.getActualPath, e)
} finally {
_serviceNode = null
}
}
}
// stop the server genteelly
def stopGracefully(): Unit = {
stop()
}
}
object ServiceDiscovery {

View File

@ -66,7 +66,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
server.start()
val znodeRoot = s"/$namespace"
val serviceDiscovery = new ServiceDiscovery(server)
val serviceDiscovery = new KyuubiServiceDiscovery(server)
val framework = ServiceDiscovery.startZookeeperClient(conf)
try {
serviceDiscovery.initialize(conf)

View File

@ -24,7 +24,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.ha.client.{KyuubiServiceDiscovery, ServiceDiscovery}
import org.apache.kyuubi.ha.server.EmbeddedZkServer
import org.apache.kyuubi.service.{AbstractBackendService, KinitAuxiliaryService, Serverable}
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
@ -79,7 +79,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
def this() = this(classOf[KyuubiServer].getSimpleName)
override private[kyuubi] val backendService: AbstractBackendService = new KyuubiBackendService()
private val discoveryService = new ServiceDiscovery(this)
private val discoveryService = new KyuubiServiceDiscovery(this)
override def initialize(conf: KyuubiConf): Unit = synchronized {
val kinit = new KinitAuxiliaryService()