From 082b182757f4f5a6aa6a3936dc4beb5945afd964 Mon Sep 17 00:00:00 2001 From: timothy65535 <86483005+timothy65535@users.noreply.github.com> Date: Tue, 13 Jul 2021 19:34:03 +0800 Subject: [PATCH] [KYUUBI 770] Fix flaky test: Add config to control if cancel invoke interrupt task on engine (#780) * [KYUUBI 770] Fix flaky test: Add config to control if cancel invoke interrupt task on engine --- .../engine/spark/IndividualSparkSuite.scala | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala index 829eaf7e1..4fe2cf6d4 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala @@ -21,7 +21,7 @@ import java.sql.{SQLTimeoutException, Statement} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import org.apache.spark.TaskKilled -import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerTaskEnd} import org.apache.spark.sql.SparkSession import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ @@ -41,26 +41,37 @@ class SparkEngineSuites extends KyuubiFunSuite { val listener = new SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { assert(taskEnd.reason.isInstanceOf[TaskKilled]) - if (forceCancel.get()) { - assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 3000) - index.incrementAndGet() - } else { - assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime >= 4000) - index.incrementAndGet() - } + // When OPERATION_FORCE_CANCEL variable is true, and the task execution is cancelled, + // the following statement will be executed + index.incrementAndGet() + } + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + // Means the query statement is executed + index.incrementAndGet() + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + // Always be executed + index.incrementAndGet() } } spark.sparkContext.addSparkListener(listener) try { - statement.setQueryTimeout(3) + statement.setQueryTimeout(5) forceCancel.set(force) val e1 = intercept[SQLTimeoutException] { - statement.execute("select java_method('java.lang.Thread', 'sleep', 5000L)") + statement.execute("select java_method('java.lang.Thread', 'sleep', 500000L)") }.getMessage assert(e1.contains("Query timed out")) + assert(index.get() != 0, "The query statement was not executed.") eventually(Timeout(30.seconds)) { - assert(index.get() == 1) + if (forceCancel.get()) { + assert(index.get() == 3) + } else { + assert(index.get() == 2) + } } } finally { spark.sparkContext.removeSparkListener(listener)