[KYUUBI #543] [TESTS] Add test for scheduler pool
Some checks failed
Kyuubi / Build (-Pspark-3.0 -Pspark-hadoop-2.7 -Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.1 -Dspark.archive.name=spark-3.1.1-bin-hadoop2.7.tgz -Dmaven.plugin.scalatest.exclude.tags=org.apache.kyuubi.tags.DataLakeTest) (push) Has been cancelled
Kyuubi / Build (-Pspark-3.0 -Pspark-hadoop-2.7) (push) Has been cancelled
Kyuubi / Build (-Pspark-3.1 -Pspark-hadoop-2.7) (push) Has been cancelled
Kyuubi / Build (-Pspark-3.1 -Pspark-hadoop-3.2) (push) Has been cancelled
SL Scan / Scan-Build (push) Has been cancelled

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/NetEase/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Improve test coverage.

### _How was this patch tested?_
Pass `org.apache.kyuubi.engine.spark.SchedulerPoolSuite`.

Closes #543 from ulysses-you/add-scheduler-pool-test.

Closes #543

f5563b1 [ulysses-you] trait
57938a1 [ulysses-you] init

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
(cherry picked from commit cacc2b05b3)
Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
This commit is contained in:
ulysses-you 2021-04-19 17:09:55 +08:00
parent b15b42cadf
commit 9355c999d3
2 changed files with 143 additions and 0 deletions

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<allocations>
<pool name="p0">
<minShare>2</minShare>
<weight>1</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
<pool name="p1">
<minShare>0</minShare>
<weight>1</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
</allocations>

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark
import java.util.concurrent.Executors
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart}
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.operation.JDBCTestUtils
class SchedulerPoolSuite extends WithSparkSQLEngine with JDBCTestUtils {
override protected def jdbcUrl: String = getJdbcUrl
override def withKyuubiConf: Map[String, String] = {
val poolFile =
Thread.currentThread().getContextClassLoader.getResource("test-scheduler-pool.xml")
Map("spark.scheduler.mode" -> "FAIR",
"spark.scheduler.allocation.file" -> poolFile.getFile,
"spark.master" -> "local[2]")
}
test("Scheudler pool") {
@volatile var job0Started = false
@volatile var job1StartTime = 0L
@volatile var job2StartTime = 0L
@volatile var job1FinishTime = 0L
@volatile var job2FinishTime = 0L
val listener = new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
info(jobStart)
jobStart.jobId match {
case 1 => job1StartTime = jobStart.time
case 2 => job2StartTime = jobStart.time
case 0 => job0Started = true
}
}
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
info(jobEnd)
jobEnd.jobId match {
case 1 => job1FinishTime = jobEnd.time
case 2 => job2FinishTime = jobEnd.time
case _ =>
}
}
}
spark.sparkContext.addSparkListener(listener)
try {
val threads = Executors.newFixedThreadPool(3)
threads.execute(new Runnable {
override def run(): Unit = {
withJdbcStatement() { statement =>
statement.execute("SET kyuubi.operation.scheduler.pool=p0")
statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 5000l)" +
"FROM range(1, 3, 1, 2)")
}
}
})
// make sure job0 started then we have no resource right now
eventually(Timeout(3.seconds)) {
assert(job0Started)
}
Seq(1, 0).foreach { priority =>
threads.execute(new Runnable {
override def run(): Unit = {
priority match {
case 0 =>
withJdbcStatement() { statement =>
statement.execute("SET kyuubi.operation.scheduler.pool=p0")
statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 1500l)" +
"FROM range(1, 3, 1, 2)")
}
case 1 =>
withJdbcStatement() { statement =>
statement.execute("SET kyuubi.operation.scheduler.pool=p1")
statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 1500l)" +
" FROM range(1, 3, 1, 2)")
}
}
}
})
}
threads.shutdown()
eventually(Timeout(10.seconds)) {
// We can not ensure that job1 is started before job2 so here using abs.
assert(Math.abs(job1StartTime - job2StartTime) < 1000)
// Job1 minShare is 2(total resource) so that job2 should be allocated tasks after
// job1 finished.
assert(job2FinishTime - job1FinishTime >= 1000)
}
} finally {
spark.sparkContext.removeSparkListener(listener)
}
}
}