### _Why are the changes needed?_ fix issuse https://github.com/apache/kyuubi/issues/4713 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [X] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4714 from huangzhir/fixtest-schedulerpool. Closes #4713 Authored-by: huangzhir <306824224@qq.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
ec8596c9ee
commit
b4d67e4837
@ -19,6 +19,8 @@ package org.apache.kyuubi.engine.spark
|
||||
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
import scala.concurrent.duration.SECONDS
|
||||
|
||||
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart}
|
||||
import org.scalatest.concurrent.PatienceConfiguration.Timeout
|
||||
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
|
||||
@ -76,33 +78,34 @@ class SchedulerPoolSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
|
||||
eventually(Timeout(3.seconds)) {
|
||||
assert(job0Started)
|
||||
}
|
||||
Seq(1, 0).foreach { priority =>
|
||||
threads.execute(() => {
|
||||
priority match {
|
||||
case 0 =>
|
||||
withJdbcStatement() { statement =>
|
||||
statement.execute("SET kyuubi.operation.scheduler.pool=p0")
|
||||
statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 1500l)" +
|
||||
"FROM range(1, 3, 1, 2)")
|
||||
}
|
||||
threads.execute(() => {
|
||||
// job name job1
|
||||
withJdbcStatement() { statement =>
|
||||
statement.execute("SET kyuubi.operation.scheduler.pool=p1")
|
||||
statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 1500l)" +
|
||||
" FROM range(1, 3, 1, 2)")
|
||||
}
|
||||
})
|
||||
// make sure job1 started before job2
|
||||
eventually(Timeout(2.seconds)) {
|
||||
assert(job1StartTime > 0)
|
||||
}
|
||||
|
||||
case 1 =>
|
||||
withJdbcStatement() { statement =>
|
||||
statement.execute("SET kyuubi.operation.scheduler.pool=p1")
|
||||
statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 1500l)" +
|
||||
" FROM range(1, 3, 1, 2)")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
threads.execute(() => {
|
||||
// job name job2
|
||||
withJdbcStatement() { statement =>
|
||||
statement.execute("SET kyuubi.operation.scheduler.pool=p0")
|
||||
statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 1500l)" +
|
||||
"FROM range(1, 3, 1, 2)")
|
||||
}
|
||||
})
|
||||
threads.shutdown()
|
||||
eventually(Timeout(20.seconds)) {
|
||||
// We can not ensure that job1 is started before job2 so here using abs.
|
||||
assert(Math.abs(job1StartTime - job2StartTime) < 1000)
|
||||
// Job1 minShare is 2(total resource) so that job2 should be allocated tasks after
|
||||
// job1 finished.
|
||||
assert(job2FinishTime - job1FinishTime >= 1000)
|
||||
}
|
||||
threads.awaitTermination(20, SECONDS)
|
||||
// job1 should be started before job2
|
||||
assert(job1StartTime < job2StartTime)
|
||||
// job2 minShare is 2(total resource) so that job1 should be allocated tasks after
|
||||
// job2 finished.
|
||||
assert(job2FinishTime < job1FinishTime)
|
||||
} finally {
|
||||
spark.sparkContext.removeSparkListener(listener)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user