diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java index 31d61b89a..3a0d9f077 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.util.ExceptionUtils; @@ -127,6 +128,7 @@ public class RemoteShuffleInputGateDelegation { private AvailabilityProvider.AvailabilityHelper availabilityHelper; private int startSubIndex; private int endSubIndex; + private boolean partitionConnectionExceptionEnabled; public RemoteShuffleInputGateDelegation( CelebornConf celebornConf, @@ -177,6 +179,7 @@ public class RemoteShuffleInputGateDelegation { channelsInfo = createChannelInfos(); this.numConcurrentReading = numConcurrentReading; this.availabilityHelper = availabilityHelper; + this.partitionConnectionExceptionEnabled = celebornConf.partitionConnectionExceptionEnabled(); LOG.debug("Initial input gate with numConcurrentReading {}", this.numConcurrentReading); } @@ -261,8 +264,13 @@ public class RemoteShuffleInputGateDelegation { return; } Class clazz = PartitionUnRetryAbleException.class; - if (throwable.getMessage() != null && throwable.getMessage().contains(clazz.getName())) { + String message = throwable.getMessage(); + if (null != message && message.contains(clazz.getName())) { cause = new PartitionNotFoundException(rpID); + } else if (partitionConnectionExceptionEnabled + && null != message + && message.contains("Failed to connect to")) { + cause = new PartitionConnectionException(rpID, throwable); } else { cause = throwable; } diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index c57fbd6e1..6140bd713 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1120,6 +1120,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def registerShuffleFilterExcludedWorkerEnabled: Boolean = get(REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED) def reviseLostShufflesEnabled: Boolean = get(REVISE_LOST_SHUFFLES_ENABLED) + def partitionConnectionExceptionEnabled: Boolean = get(PARTITION_CONNECTION_EXCEPTION_ENABLED) // ////////////////////////////////////////////////////// // Worker // @@ -6044,6 +6045,16 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(false) + val PARTITION_CONNECTION_EXCEPTION_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.client.flink.partitionConnectionException.enabled") + .categories("client") + .version("0.6.0") + .doc("If enabled, `org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException` " + + "would be thrown when RemoteBufferStreamReader finds that the current exception is about connection " + + "failure, then Flink can be aware of the lost Celeborn server side nodes and be able to re-compute affected data.") + .booleanConf + .createWithDefault(false) + val NETWORK_IO_SASL_TIMEOUT: ConfigEntry[Long] = buildConf("celeborn..io.saslTimeout") .categories("network") diff --git a/docs/configuration/client.md b/docs/configuration/client.md index e4e8e0e83..6a41b129a 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -40,6 +40,7 @@ license: | | celeborn.client.flink.inputGate.concurrentReadings | 2147483647 | false | Max concurrent reading channels for a input gate. | 0.3.0 | remote-shuffle.job.concurrent-readings-per-gate | | celeborn.client.flink.inputGate.memory | 32m | false | Memory reserved for a input gate. | 0.3.0 | remote-shuffle.job.memory-per-gate | | celeborn.client.flink.inputGate.supportFloatingBuffer | true | false | Whether to support floating buffer in Flink input gates. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-input-gate | +| celeborn.client.flink.partitionConnectionException.enabled | false | false | If enabled, `org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException` would be thrown when RemoteBufferStreamReader finds that the current exception is about connection failure, then Flink can be aware of the lost Celeborn server side nodes and be able to re-compute affected data. | 0.6.0 | | | celeborn.client.flink.resultPartition.memory | 64m | false | Memory reserved for a result partition. | 0.3.0 | remote-shuffle.job.memory-per-partition | | celeborn.client.flink.resultPartition.supportFloatingBuffer | true | false | Whether to support floating buffer for result partitions. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-output-gate | | celeborn.client.flink.shuffle.fallback.policy | AUTO | false | Celeborn supports the following kind of fallback policies. 1. ALWAYS: always use flink built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle implementation, and fallback to use flink built-in shuffle implementation based on certain factors, e.g. availability of enough workers and quota; 3. NEVER: always use celeborn shuffle implementation, and fail fast when it it is concluded that fallback is required based on factors above. | 0.6.0 | |