From d6c4334a11ee3ba16ffe1dcbfbb31ed43ef8bd6c Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Tue, 22 Aug 2023 20:35:05 +0800 Subject: [PATCH] [CELEBORN-901] Add support for Scala 2.13 ### What changes were proposed in this pull request? This PR introduces support for Scala 2.13 1. Resolved a compilation issue specific to Scala 2.13 2. Successfully validated compatibility with Scala 2.13 through the comprehensive suite of unit tests 3. Enabled SBT CI for Scala 2.13 within the "server" module and the "spark client" For more detailed guidance on migrating to Scala 2.13, please consult the following resources: 1. https://www.scala-lang.org/blog/2017/02/28/collections-rework.html 2. https://docs.scala-lang.org/overviews/core/collections-migration-213.html ### Why are the changes needed? As title ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GA Closes #1825 from cfmcgrady/scala213. Authored-by: Fu Chen Signed-off-by: zky.zhoukeyong --- .github/workflows/sbt.yml | 95 +++++++++++++++---- .../celeborn/client/CommitManager.scala | 2 +- .../celeborn/client/LifecycleManager.scala | 4 +- .../client/commit/CommitHandler.scala | 2 +- .../celeborn/common/meta/DeviceInfo.scala | 2 +- .../common/metrics/MetricsSystem.scala | 2 +- .../protocol/message/ControlMessages.scala | 3 +- .../celeborn/common/util/ThreadUtils.scala | 10 +- .../celeborn/common/util/UtilsSuite.scala | 2 +- .../deploy/worker/storage/DeviceMonitor.scala | 3 +- 10 files changed, 85 insertions(+), 40 deletions(-) diff --git a/.github/workflows/sbt.yml b/.github/workflows/sbt.yml index 5967a48dd..c4ae29268 100644 --- a/.github/workflows/sbt.yml +++ b/.github/workflows/sbt.yml @@ -38,6 +38,9 @@ jobs: - 8 - 11 - 17 + scala: + - '2.12.15' + - '2.13.5' steps: - uses: actions/checkout@v2 - name: Setup JDK ${{ matrix.java }} @@ -48,12 +51,12 @@ jobs: check-latest: false - name: Test Service with SBT run: | - build/sbt "clean; test" + build/sbt ++${{ matrix.scala }} "clean; test" - name: Upload test log if: failure() uses: actions/upload-artifact@v3 with: - name: service-${{ matrix.java }}-unit-test-log + name: service-java-${{ matrix.java }}-scala-${{ matrix.scala }}-unit-test-log path: | **/target/test-reports/** @@ -90,24 +93,74 @@ jobs: strategy: fail-fast: false matrix: - java: - - 8 - - 11 - - 17 - spark: - - '3.0' - - '3.1' - - '3.2' - - '3.3' - - '3.4' - exclude: + include: + # Spark 3.0 + - spark: '3.0' + scala: '2.12.10' + java: 8 + - spark: '3.0' + scala: '2.12.10' + java: 11 + # Spark 3.1 + - spark: '3.1' + scala: '2.12.10' + java: 8 + - spark: '3.1' + scala: '2.12.10' + java: 11 + # Spark supports scala 2.13 since 3.2.0 + # Spark 3.2 + - spark: '3.2' + scala: '2.12.15' + java: 8 + - spark: '3.2' + scala: '2.12.15' + java: 11 + - spark: '3.2' + scala: '2.13.5' + java: 8 + - spark: '3.2' + scala: '2.13.5' + java: 11 # SPARK-33772: Spark supports JDK 17 since 3.3.0 - - java: 17 - spark: '3.0' - - java: 17 - spark: '3.1' - - java: 17 - spark: '3.2' + # Spark 3.3 + - spark: '3.3' + scala: '2.12.15' + java: 8 + - spark: '3.3' + scala: '2.12.15' + java: 11 + - spark: '3.3' + scala: '2.12.15' + java: 17 + - spark: '3.3' + scala: '2.13.5' + java: 8 + - spark: '3.3' + scala: '2.13.5' + java: 11 + - spark: '3.3' + scala: '2.13.5' + java: 17 + # Spark 3.4 + - spark: '3.4' + scala: '2.12.17' + java: 8 + - spark: '3.4' + scala: '2.12.17' + java: 11 + - spark: '3.4' + scala: '2.12.17' + java: 17 + - spark: '3.4' + scala: '2.13.5' + java: 8 + - spark: '3.4' + scala: '2.13.5' + java: 11 + - spark: '3.4' + scala: '2.13.5' + java: 17 steps: - uses: actions/checkout@v2 - name: Setup JDK ${{ matrix.java }} @@ -118,12 +171,12 @@ jobs: check-latest: false - name: Test with SBT run: | - build/sbt -Pspark-${{ matrix.spark }} "clean; celeborn-spark-group/test" + build/sbt -Pspark-${{ matrix.spark }} ++${{ matrix.scala }} "clean; celeborn-spark-group/test" - name: Upload test log if: failure() uses: actions/upload-artifact@v3 with: - name: spark-${{ matrix.spark }}-unit-test-log + name: spark-${{ matrix.spark }}-scala-${{ matrix.scala }}-unit-test-log path: | **/target/test-reports/** diff --git a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala index f211f9ba6..0baacd10e 100644 --- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala @@ -117,7 +117,7 @@ class CommitManager(appUniqueId: String, val conf: CelebornConf, lifecycleManage Math.min(workerToRequests.size, conf.clientRpcMaxParallelism) try { ThreadUtils.parmap( - workerToRequests.to, + workerToRequests, "CommitFiles", parallelism) { case (worker, requests) => diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 6e052a0b9..85748f4f8 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -424,7 +424,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends // Second, for each worker, try to initialize the endpoint. val parallelism = Math.min(Math.max(1, slots.size()), conf.clientRpcMaxParallelism) - ThreadUtils.parmap(slots.asScala.to, "InitWorkerRef", parallelism) { case (workerInfo, _) => + ThreadUtils.parmap(slots.asScala, "InitWorkerRef", parallelism) { case (workerInfo, _) => try { workerInfo.endpoint = rpcEnv.setupEndpointRef(RpcAddress.apply(workerInfo.host, workerInfo.rpcPort), WORKER_EP) @@ -648,7 +648,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends val workerPartitionLocations = slots.asScala.filter(p => !p._2._1.isEmpty || !p._2._2.isEmpty) val parallelism = Math.min(Math.max(1, workerPartitionLocations.size), conf.clientRpcMaxParallelism) - ThreadUtils.parmap(workerPartitionLocations.to, "ReserveSlot", parallelism) { + ThreadUtils.parmap(workerPartitionLocations, "ReserveSlot", parallelism) { case (workerInfo, (primaryLocations, replicaLocations)) => val res = requestWorkerReserveSlots( workerInfo.endpoint, diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala index 6a011e670..5f17e4ef0 100644 --- a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala +++ b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala @@ -197,7 +197,7 @@ abstract class CommitHandler( val workerPartitionLocations = allocatedWorkers.asScala.filter(!_._2.isEmpty) val parallelism = Math.min(workerPartitionLocations.size, conf.clientRpcMaxParallelism) ThreadUtils.parmap( - workerPartitionLocations.to, + workerPartitionLocations, "CommitFiles", parallelism) { case (worker, partitionLocationInfo) => val primaryParts = diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala index ab11a7952..6bd49a1a0 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala @@ -131,7 +131,7 @@ class DiskInfo( val (emptyShuffles, nonEmptyShuffles) = shuffleAllocations.asScala.partition(_._2 == 0) s"DiskInfo(maxSlots: $maxSlots," + s" committed shuffles ${emptyShuffles.size}" + - s" shuffleAllocations: $nonEmptyShuffles," + + s" shuffleAllocations: ${nonEmptyShuffles.toMap}," + s" mountPoint: $mountPoint," + s" usableSpace: ${Utils.bytesToString(actualUsableSpace)}," + s" avgFlushTime: ${Utils.nanoDurationToString(avgFlushTime)}," + diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala index 0d20cf184..fdc1fef0b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala @@ -81,7 +81,7 @@ class MetricsSystem( } def getSourcesByName(sourceName: String): Seq[Source] = - sources.filter(_.sourceName == sourceName) + sources.filter(_.sourceName == sourceName).toSeq def registerSource(source: Source) { sources += source diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index d376832a3..89ef41a93 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -802,7 +802,8 @@ object ControlMessages extends Logging { val estimatedAppDiskUsage = new util.HashMap[String, java.lang.Long]() val userResourceConsumption = PbSerDeUtils.fromPbUserResourceConsumption( pbHeartbeatFromWorker.getUserResourceConsumptionMap) - val pbDisks = pbHeartbeatFromWorker.getDisksList.asScala.map(PbSerDeUtils.fromPbDiskInfo) + val pbDisks = + pbHeartbeatFromWorker.getDisksList.asScala.toSeq.map(PbSerDeUtils.fromPbDiskInfo) if (!pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap.isEmpty) { estimatedAppDiskUsage.putAll(pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap) } diff --git a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala index ca5066c34..417e3650b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala @@ -20,8 +20,6 @@ package org.apache.celeborn.common.util import java.util.concurrent._ import java.util.concurrent.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread} -import scala.collection.TraversableLike -import scala.collection.generic.CanBuildFrom import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor, Future} import scala.concurrent.duration.{Duration, FiniteDuration} import scala.language.higherKinds @@ -287,13 +285,7 @@ object ThreadUtils { * @return new collection in which each element was given from the input collection `in` by * applying the lambda function `f`. */ - def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]]( - in: Col[I], - prefix: String, - maxThreads: Int)(f: I => O)(implicit - cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map - cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]] // for Future.sequence - ): Col[O] = { + def parmap[I, O](in: Iterable[I], prefix: String, maxThreads: Int)(f: I => O): Iterable[O] = { val pool = newForkJoinPool(prefix, maxThreads) try { implicit val ec = ExecutionContext.fromExecutor(pool) diff --git a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala index ff9157448..bce77cc68 100644 --- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala @@ -162,7 +162,7 @@ class UtilsSuite extends CelebornFunSuite { GetReducerFileGroupResponse] assert(response.status == responseTrans.status) - assert(response.attempts.deep == responseTrans.attempts.deep) + assert(util.Arrays.equals(response.attempts, responseTrans.attempts)) val set = (response.fileGroup.values().toArray diff responseTrans.fileGroup.values().toArray).toSet assert(set.size == 0) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala index aeea6c232..fbd1dc1e9 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala @@ -17,7 +17,7 @@ package org.apache.celeborn.service.deploy.worker.storage -import java.io._ +import java.io.{BufferedReader, File, FileInputStream, InputStreamReader, IOException} import java.nio.charset.Charset import java.util import java.util.concurrent.TimeUnit @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import org.apache.commons.io.FileUtils -import org.slf4j.LoggerFactory import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging