[CELEBORN-1713] RpcTimeoutException should include RPC address in message

### What changes were proposed in this pull request?

`RpcTimeoutException` adds RPC address in message to help troubleshooting of timeout.

### Why are the changes needed?

The message of `RpcTimeoutException` does not contain the RPC address in the message at present, which causes that the timeout problem is hard to troubleshooting for unknown rpc address.

```
24/11/12 03:00:51 [Executor task launch worker for task 53432.0 in stage 0.0 (TID 53487)] ERROR Executor: Exception in task 53432.0 in stage 0.0 (TID 53487)
org.apache.celeborn.common.rpc.RpcTimeoutException: Futures timed out after [120000 milliseconds]. This timeout is controlled by celeborn.rpc.lookupTimeout
	at org.apache.celeborn.common.rpc.RpcTimeout.org$apache$celeborn$common$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:46)
	at org.apache.celeborn.common.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:61)
	at org.apache.celeborn.common.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:57)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.celeborn.common.rpc.RpcEnv.setupEndpointRefByAddr(RpcEnv.scala:106)
	at org.apache.celeborn.common.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:114)
	at org.apache.celeborn.client.ShuffleClientImpl.setupLifecycleManagerRef(ShuffleClientImpl.java:1759)
	at org.apache.celeborn.client.ShuffleClient.get(ShuffleClient.java:89)
	at org.apache.spark.shuffle.celeborn.SparkShuffleManager.getWriter(SparkShuffleManager.java:239)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:144)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120000 milliseconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
	at org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:316)
	at org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:74)
	... 15 more
```

Therefore, `RpcTimeoutException` should include RPC address in message to help troubleshooting of timeout.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`NettyRpcEnvSuite#ask a message timeout on Future using RpcTimeout`

Closes #2907 from SteNicholas/CELEBORN-1713.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
SteNicholas 2024-11-26 11:06:54 +08:00 committed by mingji
parent 5fab0b5ae0
commit 1b193aa196
6 changed files with 22 additions and 15 deletions

View File

@ -151,7 +151,7 @@ public class MasterClient {
try {
endpointRef = getOrSetupRpcEndpointRef(currentMasterIdx);
Future<T> future = endpointRef.ask(message, rpcTimeout, ClassTag$.MODULE$.apply(clz));
return rpcTimeout.awaitResult(future);
return rpcTimeout.awaitResult(future, endpointRef.address());
} catch (Throwable e) {
throwable = e;
shouldRetry = shouldRetry(endpointRef, throwable);

View File

@ -86,6 +86,6 @@ abstract class RpcEndpointRef(conf: CelebornConf)
*/
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
val future = ask[T](message, timeout)
timeout.awaitResult(future)
timeout.awaitResult(future, address)
}
}

View File

@ -131,7 +131,7 @@ abstract class RpcEnv(config: RpcEnvConfig) {
* Retrieve the [[RpcEndpointRef]] represented by `addr`. This is a blocking action.
*/
def setupEndpointRefByAddr(addr: RpcEndpointAddress): RpcEndpointRef = {
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByAddr(addr))
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByAddr(addr), addr.rpcAddress)
}
/**

View File

@ -42,8 +42,12 @@ private[celeborn] class RpcTimeout(val duration: FiniteDuration, val timeoutProp
extends Serializable {
/** Amends the standard message of TimeoutException to include the description */
private def createRpcTimeoutException(te: TimeoutException): RpcTimeoutException = {
new RpcTimeoutException(te.getMessage + ". This timeout is controlled by " + timeoutProp, te)
private def createRpcTimeoutException(
te: TimeoutException,
rpcAddress: RpcAddress): RpcTimeoutException = {
new RpcTimeoutException(
s"${te.getMessage}. This timeout of rpc address $rpcAddress is controlled by $timeoutProp",
te)
}
/**
@ -54,11 +58,11 @@ private[celeborn] class RpcTimeout(val duration: FiniteDuration, val timeoutProp
* val timeout = new RpcTimeout(5 millis, "short timeout")
* Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout)
*/
def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
def addMessageIfTimeout[T](rpcAddress: RpcAddress): PartialFunction[Throwable, T] = {
// The exception has already been converted to a RpcTimeoutException so just raise it
case rte: RpcTimeoutException => throw rte
// Any other TimeoutException get converted to a RpcTimeoutException with modified message
case te: TimeoutException => throw createRpcTimeoutException(te)
case te: TimeoutException => throw createRpcTimeoutException(te, rpcAddress)
}
/**
@ -69,10 +73,10 @@ private[celeborn] class RpcTimeout(val duration: FiniteDuration, val timeoutProp
* @throws RpcTimeoutException if after waiting for the specified time `future`
* is still not ready
*/
def awaitResult[T](future: Future[T]): T = {
def awaitResult[T](future: Future[T], rpcAddress: RpcAddress): T = {
try {
ThreadUtils.awaitResult(future, duration)
} catch addMessageIfTimeout
} catch addMessageIfTimeout(rpcAddress)
}
}

View File

@ -293,7 +293,7 @@ class NettyRpcEnv(
case NonFatal(e) =>
onFailure(e)
}
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout(address))(ThreadUtils.sameThread)
}
private[celeborn] def serialize(content: Any): ByteBuffer = {

View File

@ -775,24 +775,26 @@ abstract class RpcEnvSuite extends CelebornFunSuite {
case _: NeverReply =>
}
})
val rpcAddress = rpcEndpointRef.address
val longTimeout = new RpcTimeout(1.second, "celeborn.rpc.long.timeout")
val shortTimeout = new RpcTimeout(10.milliseconds, "celeborn.rpc.short.timeout")
// Ask with immediate response, should complete successfully
val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout)
val reply1 = longTimeout.awaitResult(fut1)
val reply1 = longTimeout.awaitResult(fut1, rpcAddress)
assert("hello" === reply1)
// Ask with a delayed response and wait for response immediately that should timeout
val fut2 = rpcEndpointRef.ask[String](NeverReply("doh"), shortTimeout)
val reply2 =
intercept[RpcTimeoutException] {
shortTimeout.awaitResult(fut2)
shortTimeout.awaitResult(fut2, rpcAddress)
}.getMessage
// RpcTimeout.awaitResult should have added the property to the TimeoutException message
assert(reply2.contains(shortTimeout.timeoutProp))
assert(reply2.contains(
s"This timeout of rpc address $rpcAddress is controlled by ${shortTimeout.timeoutProp}"))
// Ask with delayed response and allow the Future to timeout before ThreadUtils.awaitResult
val fut3 = rpcEndpointRef.ask[String](NeverReply("goodbye"), shortTimeout)
@ -808,13 +810,14 @@ abstract class RpcEnvSuite extends CelebornFunSuite {
// When the future timed out, the recover callback should have used
// RpcTimeout.addMessageIfTimeout to add the property to the TimeoutException message
assert(reply3.contains(shortTimeout.timeoutProp))
assert(reply3.contains(
s"This timeout of rpc address $rpcAddress is controlled by ${shortTimeout.timeoutProp}"))
// Use RpcTimeout.awaitResult to process Future, since it has already failed with
// RpcTimeoutException, the same RpcTimeoutException should be thrown
val reply4 =
intercept[RpcTimeoutException] {
shortTimeout.awaitResult(fut3)
shortTimeout.awaitResult(fut3, rpcAddress)
}.getMessage
// Ensure description is not in message twice after addMessageIfTimeout and awaitResult