From 3afd20e03aaafff106da596208c3a2a89b2cab24 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Wed, 25 Aug 2021 19:21:35 +0800 Subject: [PATCH] [KYUUBI #985] Add logCaptureThreadReleased flag ### _Why are the changes needed?_ `isInterrupted` is always false if some `InterruptedException` are throwed. So we should use a new flag to check if log capture thread is released. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #985 from ulysses-you/flaky-test. Closes #985 1fcc0a99 [ulysses-you] empty 463a996f [ulysses-you] empty d1f2ea54 [ulysses-you] eventually f9f37343 [ulysses-you] fix Authored-by: ulysses-you Signed-off-by: Cheng Pan --- .../main/scala/org/apache/kyuubi/engine/ProcBuilder.scala | 5 ++++- .../kyuubi/engine/spark/SparkProcessBuilderSuite.scala | 7 +++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala index 41d7f8726..6e48170e7 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala @@ -65,7 +65,8 @@ trait ProcBuilder { @volatile private var error: Throwable = UNCAUGHT_ERROR @volatile private var lastRowOfLog: String = "unknown" // Visible for test - private[kyuubi] var logCaptureThread: Thread = _ + @volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true + private var logCaptureThread: Thread = _ private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized { val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT) @@ -136,10 +137,12 @@ trait ProcBuilder { case _: IOException => case _: InterruptedException => } finally { + logCaptureThreadReleased = true reader.close() } } + logCaptureThreadReleased = false logCaptureThread = PROC_BUILD_LOGGER.newThread(redirect) logCaptureThread.start() proc diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index 6f2b2e159..592822946 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -126,13 +126,16 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper { test("log capture should release after close") { val process = new FakeSparkProcessBuilder(KyuubiConf()) try { + assert(process.logCaptureThreadReleased) val subProcess = process.start - assert(!process.logCaptureThread.isInterrupted) + assert(!process.logCaptureThreadReleased) subProcess.waitFor(3, TimeUnit.SECONDS) } finally { process.close() } - assert(process.logCaptureThread.isInterrupted) + eventually(timeout(3.seconds), interval(100.milliseconds)) { + assert(process.logCaptureThreadReleased) + } } test(s"sub process log should be overwritten") {