[CELEBORN-901] Add support for Scala 2.13

### What changes were proposed in this pull request?

This PR introduces support for Scala 2.13

1. Resolved a compilation issue specific to Scala 2.13
2. Successfully validated compatibility with Scala 2.13 through the comprehensive suite of unit tests
3. Enabled SBT CI for Scala 2.13 within the "server" module and the "spark client"

For more detailed guidance on migrating to Scala 2.13, please consult the following resources:

1. https://www.scala-lang.org/blog/2017/02/28/collections-rework.html
2. https://docs.scala-lang.org/overviews/core/collections-migration-213.html

### Why are the changes needed?

As title

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass GA

Closes #1825 from cfmcgrady/scala213.

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
Fu Chen 2023-08-22 20:35:05 +08:00 committed by zky.zhoukeyong
parent 86d78d9572
commit d6c4334a11
10 changed files with 85 additions and 40 deletions

View File

@ -38,6 +38,9 @@ jobs:
- 8
- 11
- 17
scala:
- '2.12.15'
- '2.13.5'
steps:
- uses: actions/checkout@v2
- name: Setup JDK ${{ matrix.java }}
@ -48,12 +51,12 @@ jobs:
check-latest: false
- name: Test Service with SBT
run: |
build/sbt "clean; test"
build/sbt ++${{ matrix.scala }} "clean; test"
- name: Upload test log
if: failure()
uses: actions/upload-artifact@v3
with:
name: service-${{ matrix.java }}-unit-test-log
name: service-java-${{ matrix.java }}-scala-${{ matrix.scala }}-unit-test-log
path: |
**/target/test-reports/**
@ -90,24 +93,74 @@ jobs:
strategy:
fail-fast: false
matrix:
java:
- 8
- 11
- 17
spark:
- '3.0'
- '3.1'
- '3.2'
- '3.3'
- '3.4'
exclude:
include:
# Spark 3.0
- spark: '3.0'
scala: '2.12.10'
java: 8
- spark: '3.0'
scala: '2.12.10'
java: 11
# Spark 3.1
- spark: '3.1'
scala: '2.12.10'
java: 8
- spark: '3.1'
scala: '2.12.10'
java: 11
# Spark supports scala 2.13 since 3.2.0
# Spark 3.2
- spark: '3.2'
scala: '2.12.15'
java: 8
- spark: '3.2'
scala: '2.12.15'
java: 11
- spark: '3.2'
scala: '2.13.5'
java: 8
- spark: '3.2'
scala: '2.13.5'
java: 11
# SPARK-33772: Spark supports JDK 17 since 3.3.0
- java: 17
spark: '3.0'
- java: 17
spark: '3.1'
- java: 17
spark: '3.2'
# Spark 3.3
- spark: '3.3'
scala: '2.12.15'
java: 8
- spark: '3.3'
scala: '2.12.15'
java: 11
- spark: '3.3'
scala: '2.12.15'
java: 17
- spark: '3.3'
scala: '2.13.5'
java: 8
- spark: '3.3'
scala: '2.13.5'
java: 11
- spark: '3.3'
scala: '2.13.5'
java: 17
# Spark 3.4
- spark: '3.4'
scala: '2.12.17'
java: 8
- spark: '3.4'
scala: '2.12.17'
java: 11
- spark: '3.4'
scala: '2.12.17'
java: 17
- spark: '3.4'
scala: '2.13.5'
java: 8
- spark: '3.4'
scala: '2.13.5'
java: 11
- spark: '3.4'
scala: '2.13.5'
java: 17
steps:
- uses: actions/checkout@v2
- name: Setup JDK ${{ matrix.java }}
@ -118,12 +171,12 @@ jobs:
check-latest: false
- name: Test with SBT
run: |
build/sbt -Pspark-${{ matrix.spark }} "clean; celeborn-spark-group/test"
build/sbt -Pspark-${{ matrix.spark }} ++${{ matrix.scala }} "clean; celeborn-spark-group/test"
- name: Upload test log
if: failure()
uses: actions/upload-artifact@v3
with:
name: spark-${{ matrix.spark }}-unit-test-log
name: spark-${{ matrix.spark }}-scala-${{ matrix.scala }}-unit-test-log
path: |
**/target/test-reports/**

View File

@ -117,7 +117,7 @@ class CommitManager(appUniqueId: String, val conf: CelebornConf, lifecycleManage
Math.min(workerToRequests.size, conf.clientRpcMaxParallelism)
try {
ThreadUtils.parmap(
workerToRequests.to,
workerToRequests,
"CommitFiles",
parallelism) {
case (worker, requests) =>

View File

@ -424,7 +424,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
// Second, for each worker, try to initialize the endpoint.
val parallelism = Math.min(Math.max(1, slots.size()), conf.clientRpcMaxParallelism)
ThreadUtils.parmap(slots.asScala.to, "InitWorkerRef", parallelism) { case (workerInfo, _) =>
ThreadUtils.parmap(slots.asScala, "InitWorkerRef", parallelism) { case (workerInfo, _) =>
try {
workerInfo.endpoint =
rpcEnv.setupEndpointRef(RpcAddress.apply(workerInfo.host, workerInfo.rpcPort), WORKER_EP)
@ -648,7 +648,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
val workerPartitionLocations = slots.asScala.filter(p => !p._2._1.isEmpty || !p._2._2.isEmpty)
val parallelism =
Math.min(Math.max(1, workerPartitionLocations.size), conf.clientRpcMaxParallelism)
ThreadUtils.parmap(workerPartitionLocations.to, "ReserveSlot", parallelism) {
ThreadUtils.parmap(workerPartitionLocations, "ReserveSlot", parallelism) {
case (workerInfo, (primaryLocations, replicaLocations)) =>
val res = requestWorkerReserveSlots(
workerInfo.endpoint,

View File

@ -197,7 +197,7 @@ abstract class CommitHandler(
val workerPartitionLocations = allocatedWorkers.asScala.filter(!_._2.isEmpty)
val parallelism = Math.min(workerPartitionLocations.size, conf.clientRpcMaxParallelism)
ThreadUtils.parmap(
workerPartitionLocations.to,
workerPartitionLocations,
"CommitFiles",
parallelism) { case (worker, partitionLocationInfo) =>
val primaryParts =

View File

@ -131,7 +131,7 @@ class DiskInfo(
val (emptyShuffles, nonEmptyShuffles) = shuffleAllocations.asScala.partition(_._2 == 0)
s"DiskInfo(maxSlots: $maxSlots," +
s" committed shuffles ${emptyShuffles.size}" +
s" shuffleAllocations: $nonEmptyShuffles," +
s" shuffleAllocations: ${nonEmptyShuffles.toMap}," +
s" mountPoint: $mountPoint," +
s" usableSpace: ${Utils.bytesToString(actualUsableSpace)}," +
s" avgFlushTime: ${Utils.nanoDurationToString(avgFlushTime)}," +

View File

@ -81,7 +81,7 @@ class MetricsSystem(
}
def getSourcesByName(sourceName: String): Seq[Source] =
sources.filter(_.sourceName == sourceName)
sources.filter(_.sourceName == sourceName).toSeq
def registerSource(source: Source) {
sources += source

View File

@ -802,7 +802,8 @@ object ControlMessages extends Logging {
val estimatedAppDiskUsage = new util.HashMap[String, java.lang.Long]()
val userResourceConsumption = PbSerDeUtils.fromPbUserResourceConsumption(
pbHeartbeatFromWorker.getUserResourceConsumptionMap)
val pbDisks = pbHeartbeatFromWorker.getDisksList.asScala.map(PbSerDeUtils.fromPbDiskInfo)
val pbDisks =
pbHeartbeatFromWorker.getDisksList.asScala.toSeq.map(PbSerDeUtils.fromPbDiskInfo)
if (!pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap.isEmpty) {
estimatedAppDiskUsage.putAll(pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap)
}

View File

@ -20,8 +20,6 @@ package org.apache.celeborn.common.util
import java.util.concurrent._
import java.util.concurrent.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.collection.TraversableLike
import scala.collection.generic.CanBuildFrom
import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.language.higherKinds
@ -287,13 +285,7 @@ object ThreadUtils {
* @return new collection in which each element was given from the input collection `in` by
* applying the lambda function `f`.
*/
def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]](
in: Col[I],
prefix: String,
maxThreads: Int)(f: I => O)(implicit
cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map
cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]] // for Future.sequence
): Col[O] = {
def parmap[I, O](in: Iterable[I], prefix: String, maxThreads: Int)(f: I => O): Iterable[O] = {
val pool = newForkJoinPool(prefix, maxThreads)
try {
implicit val ec = ExecutionContext.fromExecutor(pool)

View File

@ -162,7 +162,7 @@ class UtilsSuite extends CelebornFunSuite {
GetReducerFileGroupResponse]
assert(response.status == responseTrans.status)
assert(response.attempts.deep == responseTrans.attempts.deep)
assert(util.Arrays.equals(response.attempts, responseTrans.attempts))
val set =
(response.fileGroup.values().toArray diff responseTrans.fileGroup.values().toArray).toSet
assert(set.size == 0)

View File

@ -17,7 +17,7 @@
package org.apache.celeborn.service.deploy.worker.storage
import java.io._
import java.io.{BufferedReader, File, FileInputStream, InputStreamReader, IOException}
import java.nio.charset.Charset
import java.util
import java.util.concurrent.TimeUnit
@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import org.apache.commons.io.FileUtils
import org.slf4j.LoggerFactory
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging