[KYUUBI #302]fix zk client not release if open session failed

![ulysses-you](https://badgen.net/badge/Hello/ulysses-you/green) [![PR 302](https://badgen.net/badge/Preview/PR%20302/blue)](https://github.com/yaooqinn/kyuubi/pull/302) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
-->

<!-- replace ${issue ID} with the actual issue id -->
Fixes #302

<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the user case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Fix zk client leak problem.

- Add some test cases that check the changes thoroughly including negative and positive cases if possible
- Add screenshots for manual tests if appropriate
- [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request
This commit is contained in:
ulysses-you 2021-01-20 14:43:17 +08:00 committed by Kent Yao
parent 5eed2f7b50
commit 666593c426
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D

View File

@ -100,45 +100,47 @@ class KyuubiSessionImpl(
super.open()
// Init zookeeper client here to capture errors
zkClient
getServerHost match {
case Some((host, port)) => openSession(host, port)
case None =>
sessionConf.set(SparkProcessBuilder.APP_KEY, boundAppName.toString)
sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace)
val builder = new SparkProcessBuilder(appUser, sessionConf.toSparkPrefixedConf)
try {
val process = builder.start
info(s"Launching SQL engine: $builder")
var sh = getServerHost
val started = System.currentTimeMillis()
var exitValue: Option[Int] = None
while (sh.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
exitValue = Some(process.exitValue())
if (exitValue.get != 0) {
throw builder.getError
}
}
if (started + timeout <= System.currentTimeMillis()) {
process.destroyForcibly()
throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder",
builder.getError)
}
sh = getServerHost
}
val Some((host, port)) = sh
openSession(host, port)
} finally {
// we must close the process builder whether session open is success or failure since
// we have a log capture thread in process builder.
builder.close()
}
}
try {
zkClient.close()
} catch {
case e: IOException => error("Failed to release the zkClient after session established", e)
getServerHost match {
case Some((host, port)) => openSession(host, port)
case None =>
sessionConf.set(SparkProcessBuilder.APP_KEY, boundAppName.toString)
sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace)
val builder = new SparkProcessBuilder(appUser, sessionConf.toSparkPrefixedConf)
try {
val process = builder.start
info(s"Launching SQL engine: $builder")
var sh = getServerHost
val started = System.currentTimeMillis()
var exitValue: Option[Int] = None
while (sh.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
exitValue = Some(process.exitValue())
if (exitValue.get != 0) {
throw builder.getError
}
}
if (started + timeout <= System.currentTimeMillis()) {
process.destroyForcibly()
throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder",
builder.getError)
}
sh = getServerHost
}
val Some((host, port)) = sh
openSession(host, port)
} finally {
// we must close the process builder whether session open is success or failure since
// we have a log capture thread in process builder.
builder.close()
}
}
} finally {
try {
zkClient.close()
} catch {
case e: IOException => error("Failed to release the zkClient after session established", e)
}
}
}