diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SchedulerPoolSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SchedulerPoolSuite.scala index af8c90cf2..43bd3f4db 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SchedulerPoolSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SchedulerPoolSuite.scala @@ -19,6 +19,8 @@ package org.apache.kyuubi.engine.spark import java.util.concurrent.Executors +import scala.concurrent.duration.SECONDS + import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart} import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar.convertIntToGrainOfTime @@ -76,33 +78,34 @@ class SchedulerPoolSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { eventually(Timeout(3.seconds)) { assert(job0Started) } - Seq(1, 0).foreach { priority => - threads.execute(() => { - priority match { - case 0 => - withJdbcStatement() { statement => - statement.execute("SET kyuubi.operation.scheduler.pool=p0") - statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 1500l)" + - "FROM range(1, 3, 1, 2)") - } + threads.execute(() => { + // job name job1 + withJdbcStatement() { statement => + statement.execute("SET kyuubi.operation.scheduler.pool=p1") + statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 1500l)" + + " FROM range(1, 3, 1, 2)") + } + }) + // make sure job1 started before job2 + eventually(Timeout(2.seconds)) { + assert(job1StartTime > 0) + } - case 1 => - withJdbcStatement() { statement => - statement.execute("SET kyuubi.operation.scheduler.pool=p1") - statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 1500l)" + - " FROM range(1, 3, 1, 2)") - } - } - }) - } + threads.execute(() => { + // job name job2 + withJdbcStatement() { statement => + statement.execute("SET kyuubi.operation.scheduler.pool=p0") + statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 1500l)" + + "FROM range(1, 3, 1, 2)") + } + }) threads.shutdown() - eventually(Timeout(20.seconds)) { - // We can not ensure that job1 is started before job2 so here using abs. - assert(Math.abs(job1StartTime - job2StartTime) < 1000) - // Job1 minShare is 2(total resource) so that job2 should be allocated tasks after - // job1 finished. - assert(job2FinishTime - job1FinishTime >= 1000) - } + threads.awaitTermination(20, SECONDS) + // job1 should be started before job2 + assert(job1StartTime < job2StartTime) + // job2 minShare is 2(total resource) so that job1 should be allocated tasks after + // job2 finished. + assert(job2FinishTime < job1FinishTime) } finally { spark.sparkContext.removeSparkListener(listener) }