[KYUUBI 770] Fix flaky test: Add config to control if cancel invoke interrupt task on engine (#780)
* [KYUUBI 770] Fix flaky test: Add config to control if cancel invoke interrupt task on engine
This commit is contained in:
parent
b16533a972
commit
082b182757
@ -21,7 +21,7 @@ import java.sql.{SQLTimeoutException, Statement}
|
||||
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
|
||||
|
||||
import org.apache.spark.TaskKilled
|
||||
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
|
||||
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerTaskEnd}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.scalatest.concurrent.PatienceConfiguration.Timeout
|
||||
import org.scalatest.time.SpanSugar._
|
||||
@ -41,26 +41,37 @@ class SparkEngineSuites extends KyuubiFunSuite {
|
||||
val listener = new SparkListener {
|
||||
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
|
||||
assert(taskEnd.reason.isInstanceOf[TaskKilled])
|
||||
if (forceCancel.get()) {
|
||||
assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 3000)
|
||||
index.incrementAndGet()
|
||||
} else {
|
||||
assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime >= 4000)
|
||||
index.incrementAndGet()
|
||||
}
|
||||
// When OPERATION_FORCE_CANCEL variable is true, and the task execution is cancelled,
|
||||
// the following statement will be executed
|
||||
index.incrementAndGet()
|
||||
}
|
||||
|
||||
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
|
||||
// Means the query statement is executed
|
||||
index.incrementAndGet()
|
||||
}
|
||||
|
||||
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
|
||||
// Always be executed
|
||||
index.incrementAndGet()
|
||||
}
|
||||
}
|
||||
|
||||
spark.sparkContext.addSparkListener(listener)
|
||||
try {
|
||||
statement.setQueryTimeout(3)
|
||||
statement.setQueryTimeout(5)
|
||||
forceCancel.set(force)
|
||||
val e1 = intercept[SQLTimeoutException] {
|
||||
statement.execute("select java_method('java.lang.Thread', 'sleep', 5000L)")
|
||||
statement.execute("select java_method('java.lang.Thread', 'sleep', 500000L)")
|
||||
}.getMessage
|
||||
assert(e1.contains("Query timed out"))
|
||||
assert(index.get() != 0, "The query statement was not executed.")
|
||||
eventually(Timeout(30.seconds)) {
|
||||
assert(index.get() == 1)
|
||||
if (forceCancel.get()) {
|
||||
assert(index.get() == 3)
|
||||
} else {
|
||||
assert(index.get() == 2)
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
spark.sparkContext.removeSparkListener(listener)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user