From 1b193aa196415bcc92d46b63fac6742de7a6616f Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Tue, 26 Nov 2024 11:06:54 +0800 Subject: [PATCH] [CELEBORN-1713] RpcTimeoutException should include RPC address in message ### What changes were proposed in this pull request? `RpcTimeoutException` adds RPC address in message to help troubleshooting of timeout. ### Why are the changes needed? The message of `RpcTimeoutException` does not contain the RPC address in the message at present, which causes that the timeout problem is hard to troubleshooting for unknown rpc address. ``` 24/11/12 03:00:51 [Executor task launch worker for task 53432.0 in stage 0.0 (TID 53487)] ERROR Executor: Exception in task 53432.0 in stage 0.0 (TID 53487) org.apache.celeborn.common.rpc.RpcTimeoutException: Futures timed out after [120000 milliseconds]. This timeout is controlled by celeborn.rpc.lookupTimeout at org.apache.celeborn.common.rpc.RpcTimeout.org$apache$celeborn$common$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:46) at org.apache.celeborn.common.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:61) at org.apache.celeborn.common.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:57) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) at org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.celeborn.common.rpc.RpcEnv.setupEndpointRefByAddr(RpcEnv.scala:106) at org.apache.celeborn.common.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:114) at org.apache.celeborn.client.ShuffleClientImpl.setupLifecycleManagerRef(ShuffleClientImpl.java:1759) at org.apache.celeborn.client.ShuffleClient.get(ShuffleClient.java:89) at org.apache.spark.shuffle.celeborn.SparkShuffleManager.getWriter(SparkShuffleManager.java:239) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:144) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263) at org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:316) at org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:74) ... 15 more ``` Therefore, `RpcTimeoutException` should include RPC address in message to help troubleshooting of timeout. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `NettyRpcEnvSuite#ask a message timeout on Future using RpcTimeout` Closes #2907 from SteNicholas/CELEBORN-1713. Authored-by: SteNicholas Signed-off-by: mingji --- .../celeborn/common/client/MasterClient.java | 2 +- .../celeborn/common/rpc/RpcEndpointRef.scala | 2 +- .../org/apache/celeborn/common/rpc/RpcEnv.scala | 2 +- .../apache/celeborn/common/rpc/RpcTimeout.scala | 16 ++++++++++------ .../celeborn/common/rpc/netty/NettyRpcEnv.scala | 2 +- .../apache/celeborn/common/rpc/RpcEnvSuite.scala | 13 ++++++++----- 6 files changed, 22 insertions(+), 15 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java index 8cb9bd02c..53c550797 100644 --- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java +++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java @@ -151,7 +151,7 @@ public class MasterClient { try { endpointRef = getOrSetupRpcEndpointRef(currentMasterIdx); Future future = endpointRef.ask(message, rpcTimeout, ClassTag$.MODULE$.apply(clz)); - return rpcTimeout.awaitResult(future); + return rpcTimeout.awaitResult(future, endpointRef.address()); } catch (Throwable e) { throwable = e; shouldRetry = shouldRetry(endpointRef, throwable); diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala index 84f57549c..edd7005e2 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala @@ -86,6 +86,6 @@ abstract class RpcEndpointRef(conf: CelebornConf) */ def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = { val future = ask[T](message, timeout) - timeout.awaitResult(future) + timeout.awaitResult(future, address) } } diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala index 31f69a377..7a44d8b63 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala @@ -131,7 +131,7 @@ abstract class RpcEnv(config: RpcEnvConfig) { * Retrieve the [[RpcEndpointRef]] represented by `addr`. This is a blocking action. */ def setupEndpointRefByAddr(addr: RpcEndpointAddress): RpcEndpointRef = { - defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByAddr(addr)) + defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByAddr(addr), addr.rpcAddress) } /** diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcTimeout.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcTimeout.scala index 859c0e09a..ccca5614d 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcTimeout.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcTimeout.scala @@ -42,8 +42,12 @@ private[celeborn] class RpcTimeout(val duration: FiniteDuration, val timeoutProp extends Serializable { /** Amends the standard message of TimeoutException to include the description */ - private def createRpcTimeoutException(te: TimeoutException): RpcTimeoutException = { - new RpcTimeoutException(te.getMessage + ". This timeout is controlled by " + timeoutProp, te) + private def createRpcTimeoutException( + te: TimeoutException, + rpcAddress: RpcAddress): RpcTimeoutException = { + new RpcTimeoutException( + s"${te.getMessage}. This timeout of rpc address $rpcAddress is controlled by $timeoutProp", + te) } /** @@ -54,11 +58,11 @@ private[celeborn] class RpcTimeout(val duration: FiniteDuration, val timeoutProp * val timeout = new RpcTimeout(5 millis, "short timeout") * Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout) */ - def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = { + def addMessageIfTimeout[T](rpcAddress: RpcAddress): PartialFunction[Throwable, T] = { // The exception has already been converted to a RpcTimeoutException so just raise it case rte: RpcTimeoutException => throw rte // Any other TimeoutException get converted to a RpcTimeoutException with modified message - case te: TimeoutException => throw createRpcTimeoutException(te) + case te: TimeoutException => throw createRpcTimeoutException(te, rpcAddress) } /** @@ -69,10 +73,10 @@ private[celeborn] class RpcTimeout(val duration: FiniteDuration, val timeoutProp * @throws RpcTimeoutException if after waiting for the specified time `future` * is still not ready */ - def awaitResult[T](future: Future[T]): T = { + def awaitResult[T](future: Future[T], rpcAddress: RpcAddress): T = { try { ThreadUtils.awaitResult(future, duration) - } catch addMessageIfTimeout + } catch addMessageIfTimeout(rpcAddress) } } diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala index 5d2d31f66..648393786 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala @@ -293,7 +293,7 @@ class NettyRpcEnv( case NonFatal(e) => onFailure(e) } - promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread) + promise.future.mapTo[T].recover(timeout.addMessageIfTimeout(address))(ThreadUtils.sameThread) } private[celeborn] def serialize(content: Any): ByteBuffer = { diff --git a/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala b/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala index 6011960f1..9e46c8b62 100644 --- a/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala @@ -775,24 +775,26 @@ abstract class RpcEnvSuite extends CelebornFunSuite { case _: NeverReply => } }) + val rpcAddress = rpcEndpointRef.address val longTimeout = new RpcTimeout(1.second, "celeborn.rpc.long.timeout") val shortTimeout = new RpcTimeout(10.milliseconds, "celeborn.rpc.short.timeout") // Ask with immediate response, should complete successfully val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout) - val reply1 = longTimeout.awaitResult(fut1) + val reply1 = longTimeout.awaitResult(fut1, rpcAddress) assert("hello" === reply1) // Ask with a delayed response and wait for response immediately that should timeout val fut2 = rpcEndpointRef.ask[String](NeverReply("doh"), shortTimeout) val reply2 = intercept[RpcTimeoutException] { - shortTimeout.awaitResult(fut2) + shortTimeout.awaitResult(fut2, rpcAddress) }.getMessage // RpcTimeout.awaitResult should have added the property to the TimeoutException message - assert(reply2.contains(shortTimeout.timeoutProp)) + assert(reply2.contains( + s"This timeout of rpc address $rpcAddress is controlled by ${shortTimeout.timeoutProp}")) // Ask with delayed response and allow the Future to timeout before ThreadUtils.awaitResult val fut3 = rpcEndpointRef.ask[String](NeverReply("goodbye"), shortTimeout) @@ -808,13 +810,14 @@ abstract class RpcEnvSuite extends CelebornFunSuite { // When the future timed out, the recover callback should have used // RpcTimeout.addMessageIfTimeout to add the property to the TimeoutException message - assert(reply3.contains(shortTimeout.timeoutProp)) + assert(reply3.contains( + s"This timeout of rpc address $rpcAddress is controlled by ${shortTimeout.timeoutProp}")) // Use RpcTimeout.awaitResult to process Future, since it has already failed with // RpcTimeoutException, the same RpcTimeoutException should be thrown val reply4 = intercept[RpcTimeoutException] { - shortTimeout.awaitResult(fut3) + shortTimeout.awaitResult(fut3, rpcAddress) }.getMessage // Ensure description is not in message twice after addMessageIfTimeout and awaitResult