[CELEBORN-727][TEST] Fix flaky test RssHashCheckDiskSuite

### What changes were proposed in this pull request?

Fix the flaky test by enlarging `celeborn.client.shuffle.expired.checkInterval`

### Why are the changes needed?

```
RssHashCheckDiskSuite:
- celeborn spark integration test - hash-checkDiskFull *** FAILED ***
  868 was not less than 0 (RssHashCheckDiskSuite.scala:83)
```

https://github.com/apache/incubator-celeborn/actions/runs/5396767745/jobs/9800766633

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass GA, and should observe CI,

Closes #1640 from pan3793/CELEBORN-727.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2023-06-28 17:59:54 +08:00
parent a1199a9895
commit b821349c4a
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
6 changed files with 45 additions and 50 deletions

View File

@ -44,7 +44,7 @@ class PushDataTimeoutTest extends AnyFunSuite
// enabled, there is a possibility that two workers might be added to the excluded list due to
// master/slave timeout issues, then there are not enough workers to do replication if available
// workers number = 1
setUpMiniCluster(masterConfs = null, workerConfs = workerConf, workerNum = 4)
setUpMiniCluster(masterConf = null, workerConf = workerConf, workerNum = 4)
}
override def beforeEach(): Unit = {

View File

@ -34,7 +34,7 @@ class RetryCommitFilesTest extends AnyFunSuite
logInfo("test initialized , setup Celeborn mini cluster")
val workerConf = Map(
"celeborn.test.retryCommitFiles" -> s"true")
setUpMiniCluster(masterConfs = null, workerConfs = workerConf)
setUpMiniCluster(masterConf = null, workerConf = workerConf)
}
override def beforeEach(): Unit = {

View File

@ -32,7 +32,7 @@ class RetryReviveTest extends AnyFunSuite
override def beforeAll(): Unit = {
logInfo("test initialized , setup celeborn mini cluster")
setUpMiniCluster(masterConfs = null)
setUpMiniCluster(masterConf = null)
}
override def beforeEach(): Unit = {

View File

@ -21,25 +21,25 @@ import scala.collection.JavaConverters._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.protocol.ShuffleMode
import org.apache.celeborn.service.deploy.worker.Worker
class RssHashCheckDiskSuite extends AnyFunSuite
with SparkTestBase
with BeforeAndAfterEach {
var workers: collection.Set[Worker] = null
class RssHashCheckDiskSuite extends SparkTestBase {
var workers: collection.Set[Worker] = _
override def beforeAll(): Unit = {
logInfo("RssHashCheckDiskSuite test initialized , setup rss mini cluster")
val masterConfs = Map(CelebornConf.APPLICATION_HEARTBEAT_TIMEOUT.key -> "10s")
val workerConfs = Map(
logInfo("RssHashCheckDiskSuite test initialized , setup Celeborn mini cluster")
val masterConf = Map(
CelebornConf.APPLICATION_HEARTBEAT_TIMEOUT.key -> "10s")
val workerConf = Map(
CelebornConf.WORKER_STORAGE_DIRS.key -> "/tmp:capacity=1000",
CelebornConf.WORKER_HEARTBEAT_TIMEOUT.key -> "10s")
workers = setUpMiniCluster(masterConfs, workerConfs)._2
workers = setUpMiniCluster(masterConf, workerConf)._2
}
override def beforeEach(): Unit = {
@ -51,54 +51,51 @@ class RssHashCheckDiskSuite extends AnyFunSuite
}
test("celeborn spark integration test - hash-checkDiskFull") {
val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[2]").set(
s"spark.${CelebornConf.SHUFFLE_EXPIRED_CHECK_INTERVAL.key}",
"5s")
val sparkConf = new SparkConf().setAppName("rss-demo")
.setMaster("local[2]")
.set(s"spark.${CelebornConf.SHUFFLE_EXPIRED_CHECK_INTERVAL.key}", "20s")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val combineResult = combine(sparkSession)
val groupbyResult = groupBy(sparkSession)
val groupByResult = groupBy(sparkSession)
val repartitionResult = repartition(sparkSession)
val sqlResult = runsql(sparkSession)
Thread.sleep(3000L)
sparkSession.stop()
val rssSparkSession = SparkSession.builder()
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.getOrCreate()
val rssCombineResult = combine(rssSparkSession)
val rssGroupbyResult = groupBy(rssSparkSession)
val rssGroupByResult = groupBy(rssSparkSession)
val rssRepartitionResult = repartition(rssSparkSession)
val rssSqlResult = runsql(rssSparkSession)
assert(combineResult.equals(rssCombineResult))
assert(groupbyResult.equals(rssGroupbyResult))
assert(groupByResult.equals(rssGroupByResult))
assert(repartitionResult.equals(rssRepartitionResult))
assert(combineResult.equals(rssCombineResult))
assert(sqlResult.equals(rssSqlResult))
// shuffle key not expired, diskInfo.actualUsableSpace < 0, no space
workers.map(worker => {
worker.storageManager.disksSnapshot().map(diskInfo => {
workers.foreach { worker =>
worker.storageManager.disksSnapshot().foreach { diskInfo =>
assert(diskInfo.actualUsableSpace < 0)
})
})
}
}
rssSparkSession.stop()
// wait shuffle key expired
Thread.sleep(30 * 1000L)
logInfo("after shuffle key expired")
// after shuffle key expired, storageManager.workingDirWriters will be empty
workers.map(worker => {
worker.storageManager.workingDirWriters.values().asScala.map(t => assert(t.size() == 0))
})
// after shuffle key expired, diskInfo.actualUsableSpace will equal capacity=1000
workers.map(worker => {
worker.storageManager.disksSnapshot().map(diskInfo => {
assert(diskInfo.actualUsableSpace == 1000)
})
})
eventually(timeout(60.seconds), interval(2.seconds)) {
workers.foreach { worker =>
// after shuffle key expired, storageManager.workingDirWriters will be empty
worker.storageManager.workingDirWriters.values().asScala.foreach { t =>
assert(t.size() === 0)
}
// after shuffle key expired, diskInfo.actualUsableSpace will equal capacity=1000
worker.storageManager.disksSnapshot().foreach { diskInfo =>
assert(diskInfo.actualUsableSpace === 1000)
}
}
}
}
}

View File

@ -21,7 +21,7 @@ import scala.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.scalatest.BeforeAndAfterAll
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf._
@ -30,7 +30,7 @@ import org.apache.celeborn.common.protocol.ShuffleMode
import org.apache.celeborn.service.deploy.MiniClusterFeature
trait SparkTestBase extends AnyFunSuite
with Logging with MiniClusterFeature with BeforeAndAfterAll {
with Logging with MiniClusterFeature with BeforeAndAfterAll with BeforeAndAfterEach {
private val sampleSeq = (1 to 78)
.map(Random.alphanumeric)
.toList

View File

@ -95,25 +95,23 @@ trait MiniClusterFeature extends Logging {
}
def setUpMiniCluster(
masterConfs: Map[String, String] = null,
workerConfs: Map[String, String] = null,
masterConf: Map[String, String] = null,
workerConf: Map[String, String] = null,
workerNum: Int = 3): (Master, collection.Set[Worker]) = {
val master = createMaster(masterConfs)
val master = createMaster(masterConf)
val masterThread = runnerWrap(master.rpcEnv.awaitTermination())
masterThread.start()
masterInfo = (master, masterThread)
Thread.sleep(5000L)
for (_ <- 1 to workerNum) {
val worker = createWorker(workerConfs)
(1 to workerNum).foreach { _ =>
val worker = createWorker(workerConf)
val workerThread = runnerWrap(worker.initialize())
workerThread.start()
workerInfos.put(worker, workerThread)
}
Thread.sleep(5000L)
workerInfos.foreach {
case (worker, _) => assert(worker.registered.get())
}
workerInfos.foreach { case (worker, _) => assert(worker.registered.get()) }
(master, workerInfos.keySet)
}