[CELEBORN-1290] Fix NPE occurring prior to worker registration
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
This PR addressed a NPE issue occurs when the `Worker#reigstered` member is accessed before it is initialized.
The problem occurs because the `TransportChannelHandler` might be served before the worker is registered.
```
24/02/01 15:07:32,090 WARN [push-server-6-6] TransportChannelHandler: Exception in connection from /xx.xx.xx.xx:xxx
java.lang.NullPointerException
at org.apache.celeborn.service.deploy.worker.PushDataHandler.checkRegistered(PushDataHandler.scala:714)
at org.apache.celeborn.common.network.server.TransportRequestHandler.checkRegistered(TransportRequestHandler.java:82)
at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:76)
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:151)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes #2274 from cfmcgrady/check-registered.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
This commit is contained in:
parent
4e64ae3214
commit
fe623888bf
@ -57,7 +57,7 @@ class FetchHandler(
|
||||
conf.readBuffersToTriggerReadMin)
|
||||
var storageManager: StorageManager = _
|
||||
var partitionsSorter: PartitionFilesSorter = _
|
||||
var registered: AtomicBoolean = new AtomicBoolean(false)
|
||||
var registered: Option[AtomicBoolean] = None
|
||||
|
||||
def init(worker: Worker): Unit = {
|
||||
workerSource.addGauge(WorkerSource.ACTIVE_CHUNK_STREAM_COUNT) { () =>
|
||||
@ -74,7 +74,7 @@ class FetchHandler(
|
||||
|
||||
this.storageManager = worker.storageManager
|
||||
this.partitionsSorter = worker.partitionsSorter
|
||||
this.registered = worker.registered
|
||||
this.registered = Some(worker.registered)
|
||||
}
|
||||
|
||||
def getRawDiskFileInfo(
|
||||
@ -456,7 +456,7 @@ class FetchHandler(
|
||||
}
|
||||
}
|
||||
|
||||
override def checkRegistered: Boolean = registered.get
|
||||
override def checkRegistered: Boolean = registered.exists(_.get)
|
||||
|
||||
/** Invoked when the channel associated with the given client is active. */
|
||||
override def channelActive(client: TransportClient): Unit = {
|
||||
|
||||
@ -56,7 +56,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
|
||||
private var replicateThreadPool: ThreadPoolExecutor = _
|
||||
private var unavailablePeers: ConcurrentHashMap[WorkerInfo, Long] = _
|
||||
private var replicateClientFactory: TransportClientFactory = _
|
||||
private var registered: AtomicBoolean = _
|
||||
private var registered: Option[AtomicBoolean] = None
|
||||
private var workerInfo: WorkerInfo = _
|
||||
private var diskReserveSize: Long = _
|
||||
private var diskReserveRatio: Option[Double] = _
|
||||
@ -79,7 +79,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
|
||||
replicateThreadPool = worker.replicateThreadPool
|
||||
unavailablePeers = worker.unavailablePeers
|
||||
replicateClientFactory = worker.replicateClientFactory
|
||||
registered = worker.registered
|
||||
registered = Some(worker.registered)
|
||||
workerInfo = worker.workerInfo
|
||||
diskReserveSize = worker.conf.workerDiskReserveSize
|
||||
diskReserveRatio = worker.conf.workerDiskReserveRatio
|
||||
@ -707,7 +707,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
|
||||
(mapId, attemptId)
|
||||
}
|
||||
|
||||
override def checkRegistered(): Boolean = registered.get()
|
||||
override def checkRegistered(): Boolean = registered.exists(_.get)
|
||||
|
||||
class RpcResponseCallbackWithTimer(
|
||||
source: Source,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user