[KYUUBI #543] [TESTS] Add test for scheduler pool
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/NetEase/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Improve test coverage. ### _How was this patch tested?_ Pass `org.apache.kyuubi.engine.spark.SchedulerPoolSuite`. Closes #543 from ulysses-you/add-scheduler-pool-test. Closes #543 f5563b1 [ulysses-you] trait 57938a1 [ulysses-you] init Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
This commit is contained in:
parent
105813b1d5
commit
cacc2b05b3
30
externals/kyuubi-spark-sql-engine/src/test/resources/test-scheduler-pool.xml
vendored
Normal file
30
externals/kyuubi-spark-sql-engine/src/test/resources/test-scheduler-pool.xml
vendored
Normal file
@ -0,0 +1,30 @@
|
||||
<?xml version="1.0"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<allocations>
|
||||
<pool name="p0">
|
||||
<minShare>2</minShare>
|
||||
<weight>1</weight>
|
||||
<schedulingMode>FAIR</schedulingMode>
|
||||
</pool>
|
||||
<pool name="p1">
|
||||
<minShare>0</minShare>
|
||||
<weight>1</weight>
|
||||
<schedulingMode>FAIR</schedulingMode>
|
||||
</pool>
|
||||
</allocations>
|
||||
@ -0,0 +1,113 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.engine.spark
|
||||
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart}
|
||||
import org.scalatest.concurrent.PatienceConfiguration.Timeout
|
||||
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
|
||||
|
||||
import org.apache.kyuubi.operation.JDBCTestUtils
|
||||
|
||||
class SchedulerPoolSuite extends WithSparkSQLEngine with JDBCTestUtils {
|
||||
override protected def jdbcUrl: String = getJdbcUrl
|
||||
override def withKyuubiConf: Map[String, String] = {
|
||||
val poolFile =
|
||||
Thread.currentThread().getContextClassLoader.getResource("test-scheduler-pool.xml")
|
||||
Map("spark.scheduler.mode" -> "FAIR",
|
||||
"spark.scheduler.allocation.file" -> poolFile.getFile,
|
||||
"spark.master" -> "local[2]")
|
||||
}
|
||||
|
||||
test("Scheudler pool") {
|
||||
@volatile var job0Started = false
|
||||
@volatile var job1StartTime = 0L
|
||||
@volatile var job2StartTime = 0L
|
||||
@volatile var job1FinishTime = 0L
|
||||
@volatile var job2FinishTime = 0L
|
||||
val listener = new SparkListener {
|
||||
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
|
||||
info(jobStart)
|
||||
jobStart.jobId match {
|
||||
case 1 => job1StartTime = jobStart.time
|
||||
case 2 => job2StartTime = jobStart.time
|
||||
case 0 => job0Started = true
|
||||
}
|
||||
}
|
||||
|
||||
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
|
||||
info(jobEnd)
|
||||
jobEnd.jobId match {
|
||||
case 1 => job1FinishTime = jobEnd.time
|
||||
case 2 => job2FinishTime = jobEnd.time
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
}
|
||||
spark.sparkContext.addSparkListener(listener)
|
||||
|
||||
try {
|
||||
val threads = Executors.newFixedThreadPool(3)
|
||||
threads.execute(new Runnable {
|
||||
override def run(): Unit = {
|
||||
withJdbcStatement() { statement =>
|
||||
statement.execute("SET kyuubi.operation.scheduler.pool=p0")
|
||||
statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 5000l)" +
|
||||
"FROM range(1, 3, 1, 2)")
|
||||
}
|
||||
}
|
||||
})
|
||||
// make sure job0 started then we have no resource right now
|
||||
eventually(Timeout(3.seconds)) {
|
||||
assert(job0Started)
|
||||
}
|
||||
Seq(1, 0).foreach { priority =>
|
||||
threads.execute(new Runnable {
|
||||
override def run(): Unit = {
|
||||
priority match {
|
||||
case 0 =>
|
||||
withJdbcStatement() { statement =>
|
||||
statement.execute("SET kyuubi.operation.scheduler.pool=p0")
|
||||
statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 1500l)" +
|
||||
"FROM range(1, 3, 1, 2)")
|
||||
}
|
||||
|
||||
case 1 =>
|
||||
withJdbcStatement() { statement =>
|
||||
statement.execute("SET kyuubi.operation.scheduler.pool=p1")
|
||||
statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 1500l)" +
|
||||
" FROM range(1, 3, 1, 2)")
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
threads.shutdown()
|
||||
eventually(Timeout(10.seconds)) {
|
||||
// We can not ensure that job1 is started before job2 so here using abs.
|
||||
assert(Math.abs(job1StartTime - job2StartTime) < 1000)
|
||||
// Job1 minShare is 2(total resource) so that job2 should be allocated tasks after
|
||||
// job1 finished.
|
||||
assert(job2FinishTime - job1FinishTime >= 1000)
|
||||
}
|
||||
} finally {
|
||||
spark.sparkContext.removeSparkListener(listener)
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user