diff --git a/.github/workflows/sbt.yml b/.github/workflows/sbt.yml index 5967a48dd..c4ae29268 100644 --- a/.github/workflows/sbt.yml +++ b/.github/workflows/sbt.yml @@ -38,6 +38,9 @@ jobs: - 8 - 11 - 17 + scala: + - '2.12.15' + - '2.13.5' steps: - uses: actions/checkout@v2 - name: Setup JDK ${{ matrix.java }} @@ -48,12 +51,12 @@ jobs: check-latest: false - name: Test Service with SBT run: | - build/sbt "clean; test" + build/sbt ++${{ matrix.scala }} "clean; test" - name: Upload test log if: failure() uses: actions/upload-artifact@v3 with: - name: service-${{ matrix.java }}-unit-test-log + name: service-java-${{ matrix.java }}-scala-${{ matrix.scala }}-unit-test-log path: | **/target/test-reports/** @@ -90,24 +93,74 @@ jobs: strategy: fail-fast: false matrix: - java: - - 8 - - 11 - - 17 - spark: - - '3.0' - - '3.1' - - '3.2' - - '3.3' - - '3.4' - exclude: + include: + # Spark 3.0 + - spark: '3.0' + scala: '2.12.10' + java: 8 + - spark: '3.0' + scala: '2.12.10' + java: 11 + # Spark 3.1 + - spark: '3.1' + scala: '2.12.10' + java: 8 + - spark: '3.1' + scala: '2.12.10' + java: 11 + # Spark supports scala 2.13 since 3.2.0 + # Spark 3.2 + - spark: '3.2' + scala: '2.12.15' + java: 8 + - spark: '3.2' + scala: '2.12.15' + java: 11 + - spark: '3.2' + scala: '2.13.5' + java: 8 + - spark: '3.2' + scala: '2.13.5' + java: 11 # SPARK-33772: Spark supports JDK 17 since 3.3.0 - - java: 17 - spark: '3.0' - - java: 17 - spark: '3.1' - - java: 17 - spark: '3.2' + # Spark 3.3 + - spark: '3.3' + scala: '2.12.15' + java: 8 + - spark: '3.3' + scala: '2.12.15' + java: 11 + - spark: '3.3' + scala: '2.12.15' + java: 17 + - spark: '3.3' + scala: '2.13.5' + java: 8 + - spark: '3.3' + scala: '2.13.5' + java: 11 + - spark: '3.3' + scala: '2.13.5' + java: 17 + # Spark 3.4 + - spark: '3.4' + scala: '2.12.17' + java: 8 + - spark: '3.4' + scala: '2.12.17' + java: 11 + - spark: '3.4' + scala: '2.12.17' + java: 17 + - spark: '3.4' + scala: '2.13.5' + java: 8 + - spark: '3.4' + scala: '2.13.5' + java: 11 + - spark: '3.4' + scala: '2.13.5' + java: 17 steps: - uses: actions/checkout@v2 - name: Setup JDK ${{ matrix.java }} @@ -118,12 +171,12 @@ jobs: check-latest: false - name: Test with SBT run: | - build/sbt -Pspark-${{ matrix.spark }} "clean; celeborn-spark-group/test" + build/sbt -Pspark-${{ matrix.spark }} ++${{ matrix.scala }} "clean; celeborn-spark-group/test" - name: Upload test log if: failure() uses: actions/upload-artifact@v3 with: - name: spark-${{ matrix.spark }}-unit-test-log + name: spark-${{ matrix.spark }}-scala-${{ matrix.scala }}-unit-test-log path: | **/target/test-reports/** diff --git a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala index f211f9ba6..0baacd10e 100644 --- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala @@ -117,7 +117,7 @@ class CommitManager(appUniqueId: String, val conf: CelebornConf, lifecycleManage Math.min(workerToRequests.size, conf.clientRpcMaxParallelism) try { ThreadUtils.parmap( - workerToRequests.to, + workerToRequests, "CommitFiles", parallelism) { case (worker, requests) => diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 6e052a0b9..85748f4f8 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -424,7 +424,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends // Second, for each worker, try to initialize the endpoint. val parallelism = Math.min(Math.max(1, slots.size()), conf.clientRpcMaxParallelism) - ThreadUtils.parmap(slots.asScala.to, "InitWorkerRef", parallelism) { case (workerInfo, _) => + ThreadUtils.parmap(slots.asScala, "InitWorkerRef", parallelism) { case (workerInfo, _) => try { workerInfo.endpoint = rpcEnv.setupEndpointRef(RpcAddress.apply(workerInfo.host, workerInfo.rpcPort), WORKER_EP) @@ -648,7 +648,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends val workerPartitionLocations = slots.asScala.filter(p => !p._2._1.isEmpty || !p._2._2.isEmpty) val parallelism = Math.min(Math.max(1, workerPartitionLocations.size), conf.clientRpcMaxParallelism) - ThreadUtils.parmap(workerPartitionLocations.to, "ReserveSlot", parallelism) { + ThreadUtils.parmap(workerPartitionLocations, "ReserveSlot", parallelism) { case (workerInfo, (primaryLocations, replicaLocations)) => val res = requestWorkerReserveSlots( workerInfo.endpoint, diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala index 6a011e670..5f17e4ef0 100644 --- a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala +++ b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala @@ -197,7 +197,7 @@ abstract class CommitHandler( val workerPartitionLocations = allocatedWorkers.asScala.filter(!_._2.isEmpty) val parallelism = Math.min(workerPartitionLocations.size, conf.clientRpcMaxParallelism) ThreadUtils.parmap( - workerPartitionLocations.to, + workerPartitionLocations, "CommitFiles", parallelism) { case (worker, partitionLocationInfo) => val primaryParts = diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala index ab11a7952..6bd49a1a0 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala @@ -131,7 +131,7 @@ class DiskInfo( val (emptyShuffles, nonEmptyShuffles) = shuffleAllocations.asScala.partition(_._2 == 0) s"DiskInfo(maxSlots: $maxSlots," + s" committed shuffles ${emptyShuffles.size}" + - s" shuffleAllocations: $nonEmptyShuffles," + + s" shuffleAllocations: ${nonEmptyShuffles.toMap}," + s" mountPoint: $mountPoint," + s" usableSpace: ${Utils.bytesToString(actualUsableSpace)}," + s" avgFlushTime: ${Utils.nanoDurationToString(avgFlushTime)}," + diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala index 0d20cf184..fdc1fef0b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala @@ -81,7 +81,7 @@ class MetricsSystem( } def getSourcesByName(sourceName: String): Seq[Source] = - sources.filter(_.sourceName == sourceName) + sources.filter(_.sourceName == sourceName).toSeq def registerSource(source: Source) { sources += source diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index d376832a3..89ef41a93 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -802,7 +802,8 @@ object ControlMessages extends Logging { val estimatedAppDiskUsage = new util.HashMap[String, java.lang.Long]() val userResourceConsumption = PbSerDeUtils.fromPbUserResourceConsumption( pbHeartbeatFromWorker.getUserResourceConsumptionMap) - val pbDisks = pbHeartbeatFromWorker.getDisksList.asScala.map(PbSerDeUtils.fromPbDiskInfo) + val pbDisks = + pbHeartbeatFromWorker.getDisksList.asScala.toSeq.map(PbSerDeUtils.fromPbDiskInfo) if (!pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap.isEmpty) { estimatedAppDiskUsage.putAll(pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap) } diff --git a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala index ca5066c34..417e3650b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala @@ -20,8 +20,6 @@ package org.apache.celeborn.common.util import java.util.concurrent._ import java.util.concurrent.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread} -import scala.collection.TraversableLike -import scala.collection.generic.CanBuildFrom import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor, Future} import scala.concurrent.duration.{Duration, FiniteDuration} import scala.language.higherKinds @@ -287,13 +285,7 @@ object ThreadUtils { * @return new collection in which each element was given from the input collection `in` by * applying the lambda function `f`. */ - def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]]( - in: Col[I], - prefix: String, - maxThreads: Int)(f: I => O)(implicit - cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map - cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]] // for Future.sequence - ): Col[O] = { + def parmap[I, O](in: Iterable[I], prefix: String, maxThreads: Int)(f: I => O): Iterable[O] = { val pool = newForkJoinPool(prefix, maxThreads) try { implicit val ec = ExecutionContext.fromExecutor(pool) diff --git a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala index ff9157448..bce77cc68 100644 --- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala @@ -162,7 +162,7 @@ class UtilsSuite extends CelebornFunSuite { GetReducerFileGroupResponse] assert(response.status == responseTrans.status) - assert(response.attempts.deep == responseTrans.attempts.deep) + assert(util.Arrays.equals(response.attempts, responseTrans.attempts)) val set = (response.fileGroup.values().toArray diff responseTrans.fileGroup.values().toArray).toSet assert(set.size == 0) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala index aeea6c232..fbd1dc1e9 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala @@ -17,7 +17,7 @@ package org.apache.celeborn.service.deploy.worker.storage -import java.io._ +import java.io.{BufferedReader, File, FileInputStream, InputStreamReader, IOException} import java.nio.charset.Charset import java.util import java.util.concurrent.TimeUnit @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import org.apache.commons.io.FileUtils -import org.slf4j.LoggerFactory import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging