[MINOR] Cleanup code

### What changes were proposed in this pull request?
1. Use `<arg>-Ywarn-unused-import</arg>` to remove some unused imports
There is no way to use `<arg>-Ywarn-unused-import</arg>` at this stage
Because we have the following code
```
// Can Remove this if celeborn don't support scala211 in future
import org.apache.celeborn.common.util.FunctionConverter._
```
2. Fix scala case match not fully covered, avoid `scala.MatchError`
3. Fixed some scala compilation warnings

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #1600 from cxzl25/cleanup_code.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
sychen 2023-06-19 11:31:51 +08:00 committed by zky.zhoukeyong
parent 4cb4701ede
commit e734ceb558
23 changed files with 39 additions and 25 deletions

View File

@ -21,7 +21,7 @@ import org.apache.spark.{InterruptibleIterator, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.{ShuffleReader, ShuffleReadMetricsReporter}
import org.apache.spark.sql.execution.UnsafeRowSerializer
import org.apache.spark.sql.execution.columnar.{RssBatchBuilder, RssColumnarBatchBuilder, RssColumnarBatchSerializer}
import org.apache.spark.sql.execution.columnar.{RssBatchBuilder, RssColumnarBatchSerializer}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter

View File

@ -18,7 +18,7 @@
package org.apache.celeborn.client
import java.util
import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}
import java.util.{function, List => JList}
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import scala.collection.JavaConverters._
@ -297,6 +297,8 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
attemptId,
partitionId,
numMappers)
case _ =>
throw new UnsupportedOperationException(s"Not support $partitionType yet")
}
case GetReducerFileGroup(applicationId: String, shuffleId: Int) =>
@ -340,6 +342,8 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
initialLocs)
case PartitionType.REDUCE =>
context.reply(RegisterShuffleResponse(StatusCode.SUCCESS, initialLocs))
case _ =>
throw new UnsupportedOperationException(s"Not support $partitionType yet")
}
return
}
@ -398,6 +402,8 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
context.reply(response)
}
case PartitionType.REDUCE => context.reply(response)
case _ =>
throw new UnsupportedOperationException(s"Not support $partitionType yet")
}
}))
registeringShuffleRequest.remove(shuffleId)
@ -406,7 +412,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
// First, request to get allocated slots from Master
val ids = new util.ArrayList[Integer](numPartitions)
(0 until numPartitions).foreach(idx => ids.add(new Integer(idx)))
(0 until numPartitions).foreach(idx => ids.add(Integer.valueOf(idx)))
val res = requestMasterRequestSlotsWithRetry(applicationId, shuffleId, ids)
res.status match {

View File

@ -18,7 +18,7 @@
package org.apache.celeborn.client
import java.util
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, ScheduledFuture, TimeUnit}
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit}
import scala.collection.JavaConverters._
import scala.concurrent.duration.DurationInt

View File

@ -26,7 +26,7 @@ import scala.collection.mutable
import org.apache.celeborn.client.{ShuffleCommittedInfo, WorkerStatusTracker}
import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, ShuffleFailedWorkers, ShuffleFileGroups}
import org.apache.celeborn.client.LifecycleManager.{ShuffleFailedWorkers, ShuffleFileGroups}
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, WorkerInfo}

View File

@ -22,7 +22,6 @@ import java.util.regex.PatternSyntaxException
import scala.util.matching.Regex
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.network.util.ByteUnit
import org.apache.celeborn.common.util.{JavaUtils, Utils}

View File

@ -17,8 +17,6 @@
package org.apache.celeborn.common.internal.config
import java.util.concurrent.ConcurrentHashMap
import org.apache.celeborn.common.internal.config.ConfigHelpers.AlternativesTransfer
import org.apache.celeborn.common.util.JavaUtils

View File

@ -19,7 +19,6 @@ package org.apache.celeborn.common.meta
import java.util
import java.util.concurrent.ConcurrentHashMap
import java.util.stream.Collectors
import scala.collection.JavaConverters._

View File

@ -18,8 +18,6 @@
package org.apache.celeborn.common.meta
import java.util
import java.util.Objects
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._

View File

@ -109,7 +109,7 @@ class MetricsSystem(
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
val source = Utils.classForName(classPath).newInstance()
val source = Utils.classForName(classPath).getDeclaredConstructor().newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)

View File

@ -421,7 +421,7 @@ class TimerSupplier(val slidingWindowSize: Int)
class GaugeSupplier[T](f: Unit => T) extends MetricRegistry.MetricSupplier[Gauge[_]] {
override def newMetric(): Gauge[T] = {
new Gauge[T] {
override def getValue: T = f()
override def getValue: T = f(())
}
}
}

View File

@ -37,6 +37,7 @@ class JVMSource(conf: CelebornConf, role: String) extends AbstractSource(conf, r
.map { x =>
x.getMetrics.asScala.map {
case (name: String, metric: Gauge[_]) => addGauge(name, metric)
case (name, metric) => new IllegalArgumentException(s"Unknown metric type: $name: $metric")
}
}
// start cleaner

View File

@ -574,7 +574,7 @@ object ControlMessages extends Logging {
PbFileGroup.newBuilder().addAllLocations(fileGroup.asScala.map(PbSerDeUtils
.toPbPartitionLocation).toList.asJava).build())
}.asJava)
builder.addAllAttempts(attempts.map(new Integer(_)).toIterable.asJava)
builder.addAllAttempts(attempts.map(Integer.valueOf).toIterable.asJava)
builder.addAllPartitionIds(partitionIds)
val payload = builder.build().toByteArray
new TransportMessage(MessageType.GET_REDUCER_FILE_GROUP_RESPONSE, payload)
@ -734,7 +734,7 @@ object ControlMessages extends Logging {
.setShuffleId(shuffleId)
.addAllMasterIds(masterIds)
.addAllSlaveIds(slaveIds)
.addAllMapAttempts(mapAttempts.map(new Integer(_)).toIterable.asJava)
.addAllMapAttempts(mapAttempts.map(Integer.valueOf).toIterable.asJava)
.setEpoch(epoch)
.build().toByteArray
new TransportMessage(MessageType.COMMIT_FILES, payload)

View File

@ -17,6 +17,8 @@
package org.apache.celeborn.common.util
import scala.language.implicitConversions
/**
* Implicit conversion for scala(2.11) function to java function
*/

View File

@ -18,12 +18,12 @@
package org.apache.celeborn.common.util
import java.util.concurrent._
import java.util.concurrent.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.collection.TraversableLike
import scala.collection.generic.CanBuildFrom
import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.language.higherKinds
import scala.util.control.NonFatal

View File

@ -27,7 +27,7 @@ import java.nio.charset.StandardCharsets
import java.text.SimpleDateFormat
import java.util
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent.{Callable, ConcurrentHashMap, ThreadPoolExecutor, TimeoutException, TimeUnit}
import java.util.concurrent.{Callable, ThreadPoolExecutor, TimeoutException, TimeUnit}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

View File

@ -19,7 +19,7 @@ package org.apache.celeborn.common.meta
import java.util
import java.util.{Map => jMap}
import java.util.concurrent.{ConcurrentHashMap, Future, ThreadLocalRandom}
import java.util.concurrent.{Future, ThreadLocalRandom}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ArrayBuffer

View File

@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
import scala.util.Random
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.exception.CelebornRuntimeException
import org.apache.celeborn.common.haclient.RssHARetryClient
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging

View File

@ -20,7 +20,6 @@ package org.apache.celeborn.service.deploy.master
import scala.annotation.tailrec
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.CelebornConf._
import org.apache.celeborn.common.util.{IntParam, Utils}
import org.apache.celeborn.service.deploy.master.clustermeta.ha.MasterClusterInfo

View File

@ -20,8 +20,6 @@ package org.apache.celeborn.service.deploy.master.clustermeta.ha
import java.io.IOException
import java.net.{InetAddress, InetSocketAddress}
import scala.util.{Failure, Success}
import org.apache.ratis.util.NetUtils
import org.apache.celeborn.common.internal.Logging

View File

@ -653,6 +653,15 @@
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${maven.plugin.scala.version}</version>
<configuration>
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
<arg>-explaintypes</arg>
<arg>-Xfatal-warnings</arg>
</args>
</configuration>
<executions>
<execution>
<id>scala-compile-first</id>

View File

@ -60,7 +60,6 @@ class WordCountTest extends AnyFunSuite with Logging with MiniClusterFeature
"org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory")
configuration.setString("celeborn.master.endpoints", "localhost:9097")
configuration.setString("execution.batch-shuffle-mode", "ALL_EXCHANGES_BLOCKING")
configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
configuration.setString("taskmanager.memory.network.min", "1024m")
configuration.setString(RestOptions.BIND_PORT, "8081-8089")
@ -70,7 +69,6 @@ class WordCountTest extends AnyFunSuite with Logging with MiniClusterFeature
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
env.getConfig.setExecutionMode(ExecutionMode.BATCH)
env.getConfig.setParallelism(parallelism)
env.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL)
env.disableOperatorChaining()
// make parameters available in the web interface
WordCountHelper.execute(env, parallelism)

View File

@ -94,13 +94,16 @@ class PushDataHandler extends BaseMessageHandler with Logging {
client,
pushData.requestId,
pushData.shuffleKey)
shufflePartitionType.getOrDefault(pushData.shuffleKey, PartitionType.REDUCE) match {
val partitionType =
shufflePartitionType.getOrDefault(pushData.shuffleKey, PartitionType.REDUCE)
partitionType match {
case PartitionType.REDUCE => handlePushData(
pushData,
callback)
case PartitionType.MAP => handleMapPartitionPushData(
pushData,
callback)
case _ => throw new UnsupportedOperationException(s"Not support $partitionType yet")
}
})
case pushMergedData: PushMergedData =>
@ -843,6 +846,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
(WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
case Type.REGION_FINISH =>
(WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
case _ => throw new IllegalArgumentException(s"Not support $messageType yet")
}
val location =
@ -891,6 +895,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
message.asInstanceOf[RegionStart].isBroadcast)
case Type.REGION_FINISH =>
fileWriter.asInstanceOf[MapPartitionFileWriter].regionFinish()
case _ => throw new IllegalArgumentException(s"Not support $messageType yet")
}
// for master, send data to slave
if (location.hasPeer && isMaster) {
@ -1004,6 +1009,7 @@ class PushDataHandler extends BaseMessageHandler with Logging {
case Type.REGION_FINISH => (
StatusCode.REGION_FINISH_FAIL_MASTER,
StatusCode.REGION_FINISH_FAIL_SLAVE)
case _ => throw new IllegalArgumentException(s"Not support $messageType yet")
}
callback.onFailure(new CelebornIOException(
if (isMaster) messageMaster else messageSlave,

View File

@ -311,6 +311,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
splitThreshold,
splitMode,
rangeReadFilter)
case _ => throw new UnsupportedOperationException(s"Not support $partitionType yet")
}
fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, fileInfo)
@ -354,6 +355,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
splitThreshold,
splitMode,
rangeReadFilter)
case _ => throw new UnsupportedOperationException(s"Not support $partitionType yet")
}
deviceMonitor.registerFileWriter(fileWriter)
val map = workingDirWriters.computeIfAbsent(dir, workingDirWriterListFunc)