Deploy yaooqinn/kyuubi to github.com/yaooqinn/kyuubi.git:gh-pages
This commit is contained in:
parent
1af58c40fa
commit
56f53d1215
@ -56,6 +56,8 @@ trait ProcBuilder {
|
||||
}
|
||||
|
||||
@volatile private var error: Throwable = UNCAUGHT_ERROR
|
||||
// Visible for test
|
||||
private[kyuubi] var logCaptureThread: Thread = null
|
||||
|
||||
final def start: Process = synchronized {
|
||||
val procLog = Paths.get(workingDir.toAbsolutePath.toString,
|
||||
@ -86,15 +88,23 @@ trait ProcBuilder {
|
||||
}
|
||||
} catch {
|
||||
case _: IOException =>
|
||||
case _: InterruptedException =>
|
||||
} finally {
|
||||
reader.close()
|
||||
}
|
||||
}
|
||||
|
||||
PROC_BUILD_LOGGER.newThread(redirect).start()
|
||||
logCaptureThread = PROC_BUILD_LOGGER.newThread(redirect)
|
||||
logCaptureThread.start()
|
||||
proc
|
||||
}
|
||||
|
||||
def close(): Unit = {
|
||||
if (logCaptureThread != null) {
|
||||
logCaptureThread.interrupt()
|
||||
}
|
||||
}
|
||||
|
||||
def getError: Throwable = synchronized {
|
||||
if (error == UNCAUGHT_ERROR) {
|
||||
Thread.sleep(3000)
|
||||
|
||||
@ -106,28 +106,35 @@ class KyuubiSessionImpl(
|
||||
sessionConf.set(SparkProcessBuilder.APP_KEY, boundAppName.toString)
|
||||
sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace)
|
||||
val builder = new SparkProcessBuilder(appUser, sessionConf.toSparkPrefixedConf)
|
||||
val process = builder.start
|
||||
info(s"Launching SQL engine: $builder")
|
||||
var sh = getServerHost
|
||||
val started = System.currentTimeMillis()
|
||||
var exitValue: Option[Int] = None
|
||||
while (sh.isEmpty) {
|
||||
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
|
||||
exitValue = Some(process.exitValue())
|
||||
if (exitValue.get != 0) {
|
||||
throw builder.getError
|
||||
try {
|
||||
val process = builder.start
|
||||
info(s"Launching SQL engine: $builder")
|
||||
var sh = getServerHost
|
||||
val started = System.currentTimeMillis()
|
||||
var exitValue: Option[Int] = None
|
||||
while (sh.isEmpty) {
|
||||
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
|
||||
exitValue = Some(process.exitValue())
|
||||
if (exitValue.get != 0) {
|
||||
throw builder.getError
|
||||
}
|
||||
}
|
||||
if (started + timeout <= System.currentTimeMillis()) {
|
||||
process.destroyForcibly()
|
||||
throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder",
|
||||
builder.getError)
|
||||
}
|
||||
sh = getServerHost
|
||||
}
|
||||
if (started + timeout <= System.currentTimeMillis()) {
|
||||
process.destroyForcibly()
|
||||
throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder",
|
||||
builder.getError)
|
||||
}
|
||||
sh = getServerHost
|
||||
val Some((host, port)) = sh
|
||||
openSession(host, port)
|
||||
} finally {
|
||||
// we must close the process builder whether session open is success or failure since
|
||||
// we have a log capture thread in process builder.
|
||||
builder.close()
|
||||
}
|
||||
val Some((host, port)) = sh
|
||||
openSession(host, port)
|
||||
}
|
||||
|
||||
try {
|
||||
zkClient.close()
|
||||
} catch {
|
||||
|
||||
@ -99,4 +99,20 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
|
||||
assert(!b6.toString.contains("--proxy-user kentyao"))
|
||||
}
|
||||
}
|
||||
|
||||
test("log capture should release after close") {
|
||||
val process = new FakeSparkProcessBuilder
|
||||
try {
|
||||
val subProcess = process.start
|
||||
assert(!process.logCaptureThread.isInterrupted)
|
||||
subProcess.waitFor(3, TimeUnit.SECONDS)
|
||||
} finally {
|
||||
process.close()
|
||||
}
|
||||
assert(process.logCaptureThread.isInterrupted)
|
||||
}
|
||||
}
|
||||
|
||||
class FakeSparkProcessBuilder extends SparkProcessBuilder("fake", Map.empty) {
|
||||
override protected def commands: Array[String] = Array("ls")
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user