From 810a8d01e083d8df663bcd9482f0d1178fbe1ccc Mon Sep 17 00:00:00 2001 From: Shuang Date: Wed, 11 Jan 2023 11:54:50 +0800 Subject: [PATCH] [CELEBORN-212] refresh client if current client is inactive. (#1159) --- .../client/read/WorkerPartitionReader.java | 23 +++++++++++--- .../celeborn/common/util/ExceptionUtils.java | 31 +++++++++++++++++++ 2 files changed, 49 insertions(+), 5 deletions(-) create mode 100644 common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java diff --git a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java index 9d154f58f..2a9bae7ba 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java +++ b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java @@ -38,11 +38,12 @@ import org.apache.celeborn.common.network.protocol.Message; import org.apache.celeborn.common.network.protocol.OpenStream; import org.apache.celeborn.common.network.protocol.StreamHandle; import org.apache.celeborn.common.protocol.PartitionLocation; +import org.apache.celeborn.common.util.ExceptionUtils; public class WorkerPartitionReader implements PartitionReader { private final Logger logger = LoggerFactory.getLogger(WorkerPartitionReader.class); private PartitionLocation location; - private final TransportClient client; + private final TransportClientFactory clientFactory; private StreamHandle streamHandle; private int returnedChunks; @@ -94,6 +95,7 @@ public class WorkerPartitionReader implements PartitionReader { exception.set(new IOException(errorMsg, e)); } }; + TransportClient client; try { client = clientFactory.createClient(location.getHost(), location.getFetchPort()); } catch (InterruptedException ie) { @@ -106,7 +108,7 @@ public class WorkerPartitionReader implements PartitionReader { streamHandle = (StreamHandle) Message.decode(response); this.location = location; - + this.clientFactory = clientFactory; this.fetchChunkRetryCnt = fetchChunkRetryCnt; this.fetchChunkMaxRetry = fetchChunkMaxRetry; testFetch = conf.testFetchFailure(); @@ -152,7 +154,7 @@ public class WorkerPartitionReader implements PartitionReader { return location; } - private void fetchChunks() { + private void fetchChunks() throws IOException { final int inFlight = chunkIndex - returnedChunks; if (inFlight < fetchMaxReqsInFlight) { final int toFetch = @@ -161,8 +163,19 @@ public class WorkerPartitionReader implements PartitionReader { if (testFetch && fetchChunkRetryCnt < fetchChunkMaxRetry - 1 && chunkIndex == 3) { callback.onFailure(chunkIndex, new IOException("Test fetch chunk failure")); } else { - client.fetchChunk(streamHandle.streamId, chunkIndex, callback); - chunkIndex++; + try { + TransportClient client = + clientFactory.createClient(location.getHost(), location.getFetchPort()); + client.fetchChunk(streamHandle.streamId, chunkIndex, callback); + chunkIndex++; + } catch (IOException | InterruptedException e) { + logger.error( + "fetchChunk for streamId: {}, chunkIndex: {} failed.", + streamHandle.streamId, + chunkIndex, + e); + ExceptionUtils.wrapAndThrowIOException(e); + } } } } diff --git a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java new file mode 100644 index 000000000..5336f6d1e --- /dev/null +++ b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.util; + +import java.io.IOException; + +public class ExceptionUtils { + + public static void wrapAndThrowIOException(Exception exception) throws IOException { + if (exception instanceof IOException) { + throw (IOException) exception; + } else { + throw new IOException(exception.getMessage(), exception); + } + } +}