diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala index f647dd29e..e0cc31a06 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala @@ -21,7 +21,7 @@ import org.apache.spark.{InterruptibleIterator, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.shuffle.{ShuffleReader, ShuffleReadMetricsReporter} import org.apache.spark.sql.execution.UnsafeRowSerializer -import org.apache.spark.sql.execution.columnar.{RssBatchBuilder, RssColumnarBatchBuilder, RssColumnarBatchSerializer} +import org.apache.spark.sql.execution.columnar.{RssBatchBuilder, RssColumnarBatchSerializer} import org.apache.spark.util.CompletionIterator import org.apache.spark.util.collection.ExternalSorter diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 77dcfc559..779dec0ef 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -18,7 +18,7 @@ package org.apache.celeborn.client import java.util -import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet} +import java.util.{function, List => JList} import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} import scala.collection.JavaConverters._ @@ -297,6 +297,8 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin attemptId, partitionId, numMappers) + case _ => + throw new UnsupportedOperationException(s"Not support $partitionType yet") } case GetReducerFileGroup(applicationId: String, shuffleId: Int) => @@ -340,6 +342,8 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin initialLocs) case PartitionType.REDUCE => context.reply(RegisterShuffleResponse(StatusCode.SUCCESS, initialLocs)) + case _ => + throw new UnsupportedOperationException(s"Not support $partitionType yet") } return } @@ -398,6 +402,8 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin context.reply(response) } case PartitionType.REDUCE => context.reply(response) + case _ => + throw new UnsupportedOperationException(s"Not support $partitionType yet") } })) registeringShuffleRequest.remove(shuffleId) @@ -406,7 +412,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin // First, request to get allocated slots from Master val ids = new util.ArrayList[Integer](numPartitions) - (0 until numPartitions).foreach(idx => ids.add(new Integer(idx))) + (0 until numPartitions).foreach(idx => ids.add(Integer.valueOf(idx))) val res = requestMasterRequestSlotsWithRetry(applicationId, shuffleId, ids) res.status match { diff --git a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala index 6dc635078..f8ae855f2 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala @@ -18,7 +18,7 @@ package org.apache.celeborn.client import java.util -import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, ScheduledFuture, TimeUnit} +import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit} import scala.collection.JavaConverters._ import scala.concurrent.duration.DurationInt diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala index b88af9723..1b9a9e5f2 100644 --- a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala +++ b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala @@ -26,7 +26,7 @@ import scala.collection.mutable import org.apache.celeborn.client.{ShuffleCommittedInfo, WorkerStatusTracker} import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo -import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, ShuffleFailedWorkers, ShuffleFileGroups} +import org.apache.celeborn.client.LifecycleManager.{ShuffleFailedWorkers, ShuffleFileGroups} import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, WorkerInfo} diff --git a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigBuilder.scala b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigBuilder.scala index 76fd10f09..0dff64d4d 100644 --- a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigBuilder.scala +++ b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigBuilder.scala @@ -22,7 +22,6 @@ import java.util.regex.PatternSyntaxException import scala.util.matching.Regex -import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.network.util.ByteUnit import org.apache.celeborn.common.util.{JavaUtils, Utils} diff --git a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala index ebd391399..9360688b4 100644 --- a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala +++ b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala @@ -17,8 +17,6 @@ package org.apache.celeborn.common.internal.config -import java.util.concurrent.ConcurrentHashMap - import org.apache.celeborn.common.internal.config.ConfigHelpers.AlternativesTransfer import org.apache.celeborn.common.util.JavaUtils diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala index b40bc2917..6e661ef8a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala @@ -19,7 +19,6 @@ package org.apache.celeborn.common.meta import java.util import java.util.concurrent.ConcurrentHashMap -import java.util.stream.Collectors import scala.collection.JavaConverters._ diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala index 407bb960a..02f036bcb 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala @@ -18,8 +18,6 @@ package org.apache.celeborn.common.meta import java.util -import java.util.Objects -import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala index 9f60c5cd1..0d20cf184 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala @@ -109,7 +109,7 @@ class MetricsSystem( sourceConfigs.foreach { kv => val classPath = kv._2.getProperty("class") try { - val source = Utils.classForName(classPath).newInstance() + val source = Utils.classForName(classPath).getDeclaredConstructor().newInstance() registerSource(source.asInstanceOf[Source]) } catch { case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index a445d74c7..d28986bb0 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -421,7 +421,7 @@ class TimerSupplier(val slidingWindowSize: Int) class GaugeSupplier[T](f: Unit => T) extends MetricRegistry.MetricSupplier[Gauge[_]] { override def newMetric(): Gauge[T] = { new Gauge[T] { - override def getValue: T = f() + override def getValue: T = f(()) } } } diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMSource.scala index a6e2af925..374024dd2 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMSource.scala @@ -37,6 +37,7 @@ class JVMSource(conf: CelebornConf, role: String) extends AbstractSource(conf, r .map { x => x.getMetrics.asScala.map { case (name: String, metric: Gauge[_]) => addGauge(name, metric) + case (name, metric) => new IllegalArgumentException(s"Unknown metric type: $name: $metric") } } // start cleaner diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 438369a42..7e26561ce 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -574,7 +574,7 @@ object ControlMessages extends Logging { PbFileGroup.newBuilder().addAllLocations(fileGroup.asScala.map(PbSerDeUtils .toPbPartitionLocation).toList.asJava).build()) }.asJava) - builder.addAllAttempts(attempts.map(new Integer(_)).toIterable.asJava) + builder.addAllAttempts(attempts.map(Integer.valueOf).toIterable.asJava) builder.addAllPartitionIds(partitionIds) val payload = builder.build().toByteArray new TransportMessage(MessageType.GET_REDUCER_FILE_GROUP_RESPONSE, payload) @@ -734,7 +734,7 @@ object ControlMessages extends Logging { .setShuffleId(shuffleId) .addAllMasterIds(masterIds) .addAllSlaveIds(slaveIds) - .addAllMapAttempts(mapAttempts.map(new Integer(_)).toIterable.asJava) + .addAllMapAttempts(mapAttempts.map(Integer.valueOf).toIterable.asJava) .setEpoch(epoch) .build().toByteArray new TransportMessage(MessageType.COMMIT_FILES, payload) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala b/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala index 71c75cf28..8eef121f4 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala @@ -17,6 +17,8 @@ package org.apache.celeborn.common.util +import scala.language.implicitConversions + /** * Implicit conversion for scala(2.11) function to java function */ diff --git a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala index d4cb4417b..ca5066c34 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala @@ -18,12 +18,12 @@ package org.apache.celeborn.common.util import java.util.concurrent._ +import java.util.concurrent.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread} import scala.collection.TraversableLike import scala.collection.generic.CanBuildFrom import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor, Future} import scala.concurrent.duration.{Duration, FiniteDuration} -import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread} import scala.language.higherKinds import scala.util.control.NonFatal diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index dd835c35b..f5fa1bfe3 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -27,7 +27,7 @@ import java.nio.charset.StandardCharsets import java.text.SimpleDateFormat import java.util import java.util.{Locale, Properties, Random, UUID} -import java.util.concurrent.{Callable, ConcurrentHashMap, ThreadPoolExecutor, TimeoutException, TimeUnit} +import java.util.concurrent.{Callable, ThreadPoolExecutor, TimeoutException, TimeUnit} import scala.collection.JavaConverters._ import scala.reflect.ClassTag diff --git a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala index 2a8b41b0c..8ccf1d295 100644 --- a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala @@ -19,7 +19,7 @@ package org.apache.celeborn.common.meta import java.util import java.util.{Map => jMap} -import java.util.concurrent.{ConcurrentHashMap, Future, ThreadLocalRandom} +import java.util.concurrent.{Future, ThreadLocalRandom} import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 386038019..563580101 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -26,7 +26,6 @@ import scala.collection.JavaConverters._ import scala.util.Random import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.exception.CelebornRuntimeException import org.apache.celeborn.common.haclient.RssHARetryClient import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.internal.Logging diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala index 7b0d07e70..331e9a8b3 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala @@ -20,7 +20,6 @@ package org.apache.celeborn.service.deploy.master import scala.annotation.tailrec import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.CelebornConf._ import org.apache.celeborn.common.util.{IntParam, Utils} import org.apache.celeborn.service.deploy.master.clustermeta.ha.MasterClusterInfo diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala index 0ad315704..a1ac2b67e 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala @@ -20,8 +20,6 @@ package org.apache.celeborn.service.deploy.master.clustermeta.ha import java.io.IOException import java.net.{InetAddress, InetSocketAddress} -import scala.util.{Failure, Success} - import org.apache.ratis.util.NetUtils import org.apache.celeborn.common.internal.Logging diff --git a/pom.xml b/pom.xml index 1ecf80ce8..f7d84964e 100644 --- a/pom.xml +++ b/pom.xml @@ -653,6 +653,15 @@ net.alchim31.maven scala-maven-plugin ${maven.plugin.scala.version} + + + -unchecked + -deprecation + -feature + -explaintypes + -Xfatal-warnings + + scala-compile-first diff --git a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala index 463380855..fbf97665b 100644 --- a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala +++ b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala @@ -60,7 +60,6 @@ class WordCountTest extends AnyFunSuite with Logging with MiniClusterFeature "org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory") configuration.setString("celeborn.master.endpoints", "localhost:9097") configuration.setString("execution.batch-shuffle-mode", "ALL_EXCHANGES_BLOCKING") - configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH) configuration.setString("taskmanager.memory.network.min", "1024m") configuration.setString(RestOptions.BIND_PORT, "8081-8089") @@ -70,7 +69,6 @@ class WordCountTest extends AnyFunSuite with Logging with MiniClusterFeature val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration) env.getConfig.setExecutionMode(ExecutionMode.BATCH) env.getConfig.setParallelism(parallelism) - env.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL) env.disableOperatorChaining() // make parameters available in the web interface WordCountHelper.execute(env, parallelism) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index 91ddc16f3..3695ba20d 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -94,13 +94,16 @@ class PushDataHandler extends BaseMessageHandler with Logging { client, pushData.requestId, pushData.shuffleKey) - shufflePartitionType.getOrDefault(pushData.shuffleKey, PartitionType.REDUCE) match { + val partitionType = + shufflePartitionType.getOrDefault(pushData.shuffleKey, PartitionType.REDUCE) + partitionType match { case PartitionType.REDUCE => handlePushData( pushData, callback) case PartitionType.MAP => handleMapPartitionPushData( pushData, callback) + case _ => throw new UnsupportedOperationException(s"Not support $partitionType yet") } }) case pushMergedData: PushMergedData => @@ -843,6 +846,7 @@ class PushDataHandler extends BaseMessageHandler with Logging { (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime) case Type.REGION_FINISH => (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime) + case _ => throw new IllegalArgumentException(s"Not support $messageType yet") } val location = @@ -891,6 +895,7 @@ class PushDataHandler extends BaseMessageHandler with Logging { message.asInstanceOf[RegionStart].isBroadcast) case Type.REGION_FINISH => fileWriter.asInstanceOf[MapPartitionFileWriter].regionFinish() + case _ => throw new IllegalArgumentException(s"Not support $messageType yet") } // for master, send data to slave if (location.hasPeer && isMaster) { @@ -1004,6 +1009,7 @@ class PushDataHandler extends BaseMessageHandler with Logging { case Type.REGION_FINISH => ( StatusCode.REGION_FINISH_FAIL_MASTER, StatusCode.REGION_FINISH_FAIL_SLAVE) + case _ => throw new IllegalArgumentException(s"Not support $messageType yet") } callback.onFailure(new CelebornIOException( if (isMaster) messageMaster else messageSlave, diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 3c0267fa1..62cf7a3b5 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -311,6 +311,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs splitThreshold, splitMode, rangeReadFilter) + case _ => throw new UnsupportedOperationException(s"Not support $partitionType yet") } fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo) @@ -354,6 +355,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs splitThreshold, splitMode, rangeReadFilter) + case _ => throw new UnsupportedOperationException(s"Not support $partitionType yet") } deviceMonitor.registerFileWriter(fileWriter) val map = workingDirWriters.computeIfAbsent(dir, workingDirWriterListFunc)