[CELEBORN-1902] Read client throws PartitionConnectionException
### What changes were proposed in this pull request? `org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException` is thrown when RemoteBufferStreamReader finds that the current exception is about connection failure. ### Why are the changes needed? If `org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException` is correctly thrown to reflect connection failure, then Flink can be aware of the lost Celeborn server side nodes and be able to re-compute affected data. Otherwise, endless retries could cause Flink job failure. This PR is to deal with exceptions like: ``` java.io.IOException: org.apache.celeborn.common.exception.CelebornIOException: Failed to connect to ltx1-app10154.prod.linkedin.com/10.88.105.20:23924 ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested in a Flink batch job with Celeborn. Closes #3147 from Austinfjq/throw-Partition-Connection-Exception. Lead-authored-by: Jinqian Fan <jinqianfan@icloud.com> Co-authored-by: Austin Fan <jinqianfan@icloud.com> Co-authored-by: Wang, Fei <fwang12@ebay.com> Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
parent
2a847ba90e
commit
f7be341948
@ -41,6 +41,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
|
||||
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
|
||||
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
|
||||
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
|
||||
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
|
||||
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
|
||||
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
|
||||
import org.apache.flink.util.ExceptionUtils;
|
||||
@ -127,6 +128,7 @@ public class RemoteShuffleInputGateDelegation {
|
||||
private AvailabilityProvider.AvailabilityHelper availabilityHelper;
|
||||
private int startSubIndex;
|
||||
private int endSubIndex;
|
||||
private boolean partitionConnectionExceptionEnabled;
|
||||
|
||||
public RemoteShuffleInputGateDelegation(
|
||||
CelebornConf celebornConf,
|
||||
@ -177,6 +179,7 @@ public class RemoteShuffleInputGateDelegation {
|
||||
channelsInfo = createChannelInfos();
|
||||
this.numConcurrentReading = numConcurrentReading;
|
||||
this.availabilityHelper = availabilityHelper;
|
||||
this.partitionConnectionExceptionEnabled = celebornConf.partitionConnectionExceptionEnabled();
|
||||
LOG.debug("Initial input gate with numConcurrentReading {}", this.numConcurrentReading);
|
||||
}
|
||||
|
||||
@ -261,8 +264,13 @@ public class RemoteShuffleInputGateDelegation {
|
||||
return;
|
||||
}
|
||||
Class<?> clazz = PartitionUnRetryAbleException.class;
|
||||
if (throwable.getMessage() != null && throwable.getMessage().contains(clazz.getName())) {
|
||||
String message = throwable.getMessage();
|
||||
if (null != message && message.contains(clazz.getName())) {
|
||||
cause = new PartitionNotFoundException(rpID);
|
||||
} else if (partitionConnectionExceptionEnabled
|
||||
&& null != message
|
||||
&& message.contains("Failed to connect to")) {
|
||||
cause = new PartitionConnectionException(rpID, throwable);
|
||||
} else {
|
||||
cause = throwable;
|
||||
}
|
||||
|
||||
@ -1120,6 +1120,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
|
||||
def registerShuffleFilterExcludedWorkerEnabled: Boolean =
|
||||
get(REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED)
|
||||
def reviseLostShufflesEnabled: Boolean = get(REVISE_LOST_SHUFFLES_ENABLED)
|
||||
def partitionConnectionExceptionEnabled: Boolean = get(PARTITION_CONNECTION_EXCEPTION_ENABLED)
|
||||
|
||||
// //////////////////////////////////////////////////////
|
||||
// Worker //
|
||||
@ -6044,6 +6045,16 @@ object CelebornConf extends Logging {
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val PARTITION_CONNECTION_EXCEPTION_ENABLED: ConfigEntry[Boolean] =
|
||||
buildConf("celeborn.client.flink.partitionConnectionException.enabled")
|
||||
.categories("client")
|
||||
.version("0.6.0")
|
||||
.doc("If enabled, `org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException` " +
|
||||
"would be thrown when RemoteBufferStreamReader finds that the current exception is about connection " +
|
||||
"failure, then Flink can be aware of the lost Celeborn server side nodes and be able to re-compute affected data.")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val NETWORK_IO_SASL_TIMEOUT: ConfigEntry[Long] =
|
||||
buildConf("celeborn.<module>.io.saslTimeout")
|
||||
.categories("network")
|
||||
|
||||
@ -40,6 +40,7 @@ license: |
|
||||
| celeborn.client.flink.inputGate.concurrentReadings | 2147483647 | false | Max concurrent reading channels for a input gate. | 0.3.0 | remote-shuffle.job.concurrent-readings-per-gate |
|
||||
| celeborn.client.flink.inputGate.memory | 32m | false | Memory reserved for a input gate. | 0.3.0 | remote-shuffle.job.memory-per-gate |
|
||||
| celeborn.client.flink.inputGate.supportFloatingBuffer | true | false | Whether to support floating buffer in Flink input gates. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-input-gate |
|
||||
| celeborn.client.flink.partitionConnectionException.enabled | false | false | If enabled, `org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException` would be thrown when RemoteBufferStreamReader finds that the current exception is about connection failure, then Flink can be aware of the lost Celeborn server side nodes and be able to re-compute affected data. | 0.6.0 | |
|
||||
| celeborn.client.flink.resultPartition.memory | 64m | false | Memory reserved for a result partition. | 0.3.0 | remote-shuffle.job.memory-per-partition |
|
||||
| celeborn.client.flink.resultPartition.supportFloatingBuffer | true | false | Whether to support floating buffer for result partitions. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-output-gate |
|
||||
| celeborn.client.flink.shuffle.fallback.policy | AUTO | false | Celeborn supports the following kind of fallback policies. 1. ALWAYS: always use flink built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle implementation, and fallback to use flink built-in shuffle implementation based on certain factors, e.g. availability of enough workers and quota; 3. NEVER: always use celeborn shuffle implementation, and fail fast when it it is concluded that fallback is required based on factors above. | 0.6.0 | |
|
||||
|
||||
Loading…
Reference in New Issue
Block a user