diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala index 41d7f8726..6e48170e7 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala @@ -65,7 +65,8 @@ trait ProcBuilder { @volatile private var error: Throwable = UNCAUGHT_ERROR @volatile private var lastRowOfLog: String = "unknown" // Visible for test - private[kyuubi] var logCaptureThread: Thread = _ + @volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true + private var logCaptureThread: Thread = _ private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized { val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT) @@ -136,10 +137,12 @@ trait ProcBuilder { case _: IOException => case _: InterruptedException => } finally { + logCaptureThreadReleased = true reader.close() } } + logCaptureThreadReleased = false logCaptureThread = PROC_BUILD_LOGGER.newThread(redirect) logCaptureThread.start() proc diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index 6f2b2e159..592822946 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -126,13 +126,16 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper { test("log capture should release after close") { val process = new FakeSparkProcessBuilder(KyuubiConf()) try { + assert(process.logCaptureThreadReleased) val subProcess = process.start - assert(!process.logCaptureThread.isInterrupted) + assert(!process.logCaptureThreadReleased) subProcess.waitFor(3, TimeUnit.SECONDS) } finally { process.close() } - assert(process.logCaptureThread.isInterrupted) + eventually(timeout(3.seconds), interval(100.milliseconds)) { + assert(process.logCaptureThreadReleased) + } } test(s"sub process log should be overwritten") {