[CELEBORN-332] Unify the log of ShuffleClientImpl (#1267)

* [CELEBORN-332] Unify the log of ShuffleClientImpl
This commit is contained in:
Angerszhuuuu 2023-02-24 14:07:25 +08:00 committed by GitHub
parent af9e8366c9
commit 81f7ffd767
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -187,11 +187,23 @@ public class ShuffleClientImpl extends ShuffleClient {
new CelebornIOException("Revive Failed, remain revive times " + remainReviveTimes));
} else if (mapperEnded(shuffleId, mapId, attemptId)) {
logger.debug(
"Retrying push data, but the mapper(map {} attempt {}) has ended.", mapId, attemptId);
"Revive for push data success, but the mapper already ended for shuffle {} map {} attempt {} partition {} batch {}.",
shuffleId,
mapId,
attemptId,
partitionId,
batchId);
pushState.removeBatch(batchId, loc.hostAndPushPort());
} else {
PartitionLocation newLoc = reducePartitionMap.get(shuffleId).get(partitionId);
logger.info("Revive success, new location for reduce {} is {}.", partitionId, newLoc);
logger.info(
"Revive for push data success, new location for shuffle {} map {} attempt {} partition {} batch {} is location {}.",
shuffleId,
mapId,
attemptId,
partitionId,
batchId,
newLoc);
try {
if (!testRetryRevive || remainReviveTimes < 1) {
TransportClient client =
@ -207,12 +219,14 @@ public class ShuffleClientImpl extends ShuffleClient {
"Mock push data submit retry failed. remainReviveTimes = " + remainReviveTimes + ".");
}
} catch (Exception e) {
logger.warn(
"Exception raised while pushing data for shuffle {} map {} attempt {}" + " batch {}.",
logger.error(
"Exception raised while pushing data for shuffle {} map {} attempt {} partition {} batch {} location {}.",
shuffleId,
mapId,
attemptId,
partitionId,
batchId,
newLoc,
e);
wrappedCallback.onFailure(
new Exception(
@ -260,10 +274,22 @@ public class ShuffleClientImpl extends ShuffleClient {
}
} else if (mapperEnded(shuffleId, mapId, attemptId)) {
logger.debug(
"Retrying push data, but the mapper(map {} attempt {}) has ended.", mapId, attemptId);
"Revive for push merged data success, but the mapper already ended for shuffle {} map {} attempt {} partition {} batch {}.",
shuffleId,
mapId,
attemptId,
partitionId,
oldGroupedBatchId);
} else {
PartitionLocation newLoc = reducePartitionMap.get(shuffleId).get(partitionId);
logger.info("Revive success, new location for reduce {} is {}.", partitionId, newLoc);
logger.info(
"Revive for push merged data success, new location for shuffle {} map {} attempt {} partition {} batch {} is location {}.",
shuffleId,
mapId,
attemptId,
partitionId,
oldGroupedBatchId,
newLoc);
DataBatches newDataBatches =
newDataBatchesMap.computeIfAbsent(genAddressPair(newLoc), (s) -> new DataBatches());
newDataBatches.addDataBatch(newLoc, batch.batchId, batch.body);
@ -330,10 +356,12 @@ public class ShuffleClientImpl extends ShuffleClient {
String appId, int shuffleId, int numMappers, int mapId, int attemptId) throws IOException {
int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId);
logger.info(
"register mapPartitionTask, mapId: {}, attemptId: {}, partitionId: {}",
"Register MapPartition task for shuffle {} map {} attempt {} partition {} with {} mapper.",
shuffleId,
mapId,
attemptId,
partitionId);
partitionId,
numMappers);
ConcurrentHashMap<Integer, PartitionLocation> partitionLocationMap =
registerShuffleInternal(
shuffleId,
@ -386,17 +414,17 @@ public class ShuffleClientImpl extends ShuffleClient {
return result;
} else if (StatusCode.SLOT_NOT_AVAILABLE.equals(respStatus)) {
logger.error(
"LifecycleManager request slots return {}, retry again, remain retry times {}",
"LifecycleManager request slots return {}, retry again, remain retry times {}.",
StatusCode.SLOT_NOT_AVAILABLE,
numRetries - 1);
} else if (StatusCode.RESERVE_SLOTS_FAILED.equals(respStatus)) {
logger.error(
"LifecycleManager request slots return {}, retry again, remain retry times {}",
"LifecycleManager request slots return {}, retry again, remain retry times {}.",
StatusCode.RESERVE_SLOTS_FAILED,
numRetries - 1);
} else {
logger.error(
"LifecycleManager request slots return {}, retry again, remain retry times {}",
"LifecycleManager request slots return {}, retry again, remain retry times {}.",
StatusCode.REQUEST_FAILED,
numRetries - 1);
}
@ -426,7 +454,8 @@ public class ShuffleClientImpl extends ShuffleClient {
boolean reachLimit = pushState.limitMaxInFlight(hostAndPushPort);
if (reachLimit) {
throw new CelebornIOException("wait timeout for task " + mapKey, pushState.exception.get());
throw new CelebornIOException(
"Waiting timeout for task " + mapKey, pushState.exception.get());
}
}
@ -434,7 +463,8 @@ public class ShuffleClientImpl extends ShuffleClient {
boolean reachLimit = pushState.limitZeroInFlight();
if (reachLimit) {
throw new CelebornIOException("wait timeout for task " + mapKey, pushState.exception.get());
throw new CelebornIOException(
"Waiting timeout for task " + mapKey, pushState.exception.get());
}
}
@ -450,7 +480,7 @@ public class ShuffleClientImpl extends ShuffleClient {
try {
TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
} catch (InterruptedException e) {
logger.warn("Wait revived location interrupted", e);
logger.error("Waiting revived location was interrupted.", e);
Thread.currentThread().interrupt();
}
}
@ -471,10 +501,10 @@ public class ShuffleClientImpl extends ShuffleClient {
ConcurrentHashMap<Integer, PartitionLocation> map = reducePartitionMap.get(shuffleId);
if (waitRevivedLocation(map, partitionId, epoch)) {
logger.debug(
"Has already revived for shuffle {} map {} reduce {} epoch {},"
+ " just return(Assume revive successfully).",
"Revive already success for shuffle {} map {} attempt {} partition {} epoch {}, just return true(Assume revive successfully).",
shuffleId,
mapId,
attemptId,
partitionId,
epoch);
return true;
@ -482,10 +512,11 @@ public class ShuffleClientImpl extends ShuffleClient {
String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
if (mapperEnded(shuffleId, mapId, attemptId)) {
logger.debug(
"The mapper(shuffle {} map {}) has already ended, just return(Assume"
+ " revive successfully).",
"Revive success, but the mapper ended for shuffle {} map {} attempt {} partition {}, just return true(Assume revive successfully).",
shuffleId,
mapId);
mapId,
attemptId,
partitionId);
return true;
}
@ -509,6 +540,12 @@ public class ShuffleClientImpl extends ShuffleClient {
map.put(partitionId, PbSerDeUtils.fromPbPartitionLocation(response.getLocation()));
return true;
} else if (StatusCode.MAP_ENDED.equals(respStatus)) {
logger.debug(
"Revive success, but the mapper ended for shuffle {} map {} attempt {} partition {}, just return true(Assume revive successfully).",
shuffleId,
mapId,
attemptId,
partitionId);
mapperEndMap.computeIfAbsent(shuffleId, (id) -> ConcurrentHashMap.newKeySet()).add(mapKey);
return true;
} else {
@ -516,8 +553,10 @@ public class ShuffleClientImpl extends ShuffleClient {
}
} catch (Exception e) {
logger.error(
"Exception raised while reviving for shuffle {} reduce {} epoch {}.",
"Exception raised while reviving for shuffle {} map {} attempt {} partition {} epoch {}.",
shuffleId,
mapId,
attemptId,
partitionId,
epoch,
e);
@ -544,10 +583,11 @@ public class ShuffleClientImpl extends ShuffleClient {
// return if shuffle stage already ended
if (mapperEnded(shuffleId, mapId, attemptId)) {
logger.debug(
"The mapper(shuffle {} map {} attempt {}) has already ended while" + " pushing data.",
"Push or merge data ignored because mapper already ended for shuffle {} map {} attempt {} partition {}.",
shuffleId,
mapId,
attemptId);
attemptId,
partitionId);
PushState pushState = pushStates.get(mapKey);
if (pushState != null) {
pushState.cleanup();
@ -559,7 +599,7 @@ public class ShuffleClientImpl extends ShuffleClient {
getPartitionLocation(applicationId, shuffleId, numMappers, numPartitions);
if (map == null) {
throw new CelebornIOException("Register shuffle failed for shuffle " + shuffleKey);
throw new CelebornIOException("Register shuffle failed for shuffle " + shuffleKey + ".");
}
// get location
@ -575,16 +615,17 @@ public class ShuffleClientImpl extends ShuffleClient {
null,
StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE)) {
throw new CelebornIOException(
"Revive for shuffle " + shuffleKey + " partitionId " + partitionId + " failed.");
"Revive for shuffle " + shuffleKey + " partition " + partitionId + " failed.");
}
}
if (mapperEnded(shuffleId, mapId, attemptId)) {
logger.debug(
"The mapper(shuffle {} map {} attempt {}) has already ended while" + " pushing data.",
"Push or merge data ignored because mapper already ended for shuffle {} map {} attempt {} partition {}.",
shuffleId,
mapId,
attemptId);
attemptId,
partitionId);
PushState pushState = pushStates.get(mapKey);
if (pushState != null) {
pushState.cleanup();
@ -622,14 +663,6 @@ public class ShuffleClientImpl extends ShuffleClient {
compressor.getCompressedBuffer(), 0, body, BATCH_HEADER_SIZE, compressedTotalSize);
if (doPush) {
logger.debug(
"Do push data for app {} shuffle {} map {} attempt {} reduce {} batch {}.",
applicationId,
shuffleId,
mapId,
attemptId,
partitionId,
nextBatchId);
// check limit
limitMaxInFlight(mapKey, pushState, loc.hostAndPushPort());
@ -654,11 +687,12 @@ public class ShuffleClientImpl extends ShuffleClient {
.add(mapKey);
}
logger.debug(
"Push data to {}:{} success for map {} attempt {} batch {}.",
loc.getHost(),
loc.getPushPort(),
"Push data to {} success for shuffle {} map {} attempt {} partition {} batch {}.",
loc.hostAndPushPort(),
shuffleId,
mapId,
attemptId,
partitionId,
nextBatchId);
}
@ -667,11 +701,12 @@ public class ShuffleClientImpl extends ShuffleClient {
pushState.exception.compareAndSet(
null, new CelebornIOException("Revived PushData failed!", e));
logger.error(
"Push data to {}:{} failed for map {} attempt {} batch {}.",
loc.getHost(),
loc.getPushPort(),
"Push data to {} failed for shuffle {} map {} attempt {} partition {} batch {}.",
loc.hostAndPushPort(),
shuffleId,
mapId,
attemptId,
partitionId,
nextBatchId,
e);
}
@ -687,18 +722,24 @@ public class ShuffleClientImpl extends ShuffleClient {
byte reason = response.get();
if (reason == StatusCode.SOFT_SPLIT.getValue()) {
logger.debug(
"Push data split required for map {} attempt {} batch {}",
"Push data to {} soft split required for shuffle {} map {} attempt {} partition {} batch {}.",
loc.hostAndPushPort(),
shuffleId,
mapId,
attemptId,
partitionId,
nextBatchId);
splitPartition(shuffleId, partitionId, applicationId, loc);
pushState.onSuccess(loc.hostAndPushPort());
callback.onSuccess(response);
} else if (reason == StatusCode.HARD_SPLIT.getValue()) {
logger.debug(
"Push data split for map {} attempt {} batch {}.",
"Push data to {} hard split required for shuffle {} map {} attempt {} partition {} batch {}.",
loc.hostAndPushPort(),
shuffleId,
mapId,
attemptId,
partitionId,
nextBatchId);
pushDataRetryPool.submit(
() ->
@ -716,17 +757,23 @@ public class ShuffleClientImpl extends ShuffleClient {
remainReviveTimes));
} else if (reason == StatusCode.PUSH_DATA_SUCCESS_MASTER_CONGESTED.getValue()) {
logger.debug(
"Push data split for map {} attempt {} batch {} return master congested.",
"Push data to {} master congestion required for shuffle {} map {} attempt {} partition {} batch {}.",
loc.hostAndPushPort(),
shuffleId,
mapId,
attemptId,
partitionId,
nextBatchId);
pushState.onCongestControl(loc.hostAndPushPort());
callback.onSuccess(response);
} else if (reason == StatusCode.PUSH_DATA_SUCCESS_SLAVE_CONGESTED.getValue()) {
logger.debug(
"Push data split for map {} attempt {} batch {} return slave congested.",
"Push data to {} slave congestion required for shuffle {} map {} attempt {} partition {} batch {}.",
loc.hostAndPushPort(),
shuffleId,
mapId,
attemptId,
partitionId,
nextBatchId);
pushState.onCongestControl(loc.hostAndPushPort());
callback.onSuccess(response);
@ -763,11 +810,12 @@ public class ShuffleClientImpl extends ShuffleClient {
}
logger.error(
"Push data to {}:{} failed for map {} attempt {} batch {}.",
loc.getHost(),
loc.getPushPort(),
"Push data to {} failed for shuffle {} map {} attempt {} partition {} batch {}.",
loc.hostAndPushPort(),
shuffleId,
mapId,
attemptId,
partitionId,
nextBatchId,
e);
// async retry push data
@ -790,10 +838,12 @@ public class ShuffleClientImpl extends ShuffleClient {
} else {
pushState.removeBatch(nextBatchId, loc.hostAndPushPort());
logger.info(
"Mapper shuffleId:{} mapId:{} attempt:{} already ended, remove batchId:{}.",
"Push data to {} failed but mapper already ended for shuffle {} map {} attempt {} partition {} batch {}.",
loc.hostAndPushPort(),
shuffleId,
mapId,
attemptId,
partitionId,
nextBatchId);
}
}
@ -812,7 +862,15 @@ public class ShuffleClientImpl extends ShuffleClient {
new RuntimeException("Mock push data first time failed.")));
}
} catch (Exception e) {
logger.warn("PushData failed", e);
logger.error(
"Exception raised while pushing data for shuffle {} map {} attempt {} partition {} batch {} location {}.",
shuffleId,
mapId,
attemptId,
partitionId,
nextBatchId,
loc,
e);
wrappedCallback.onFailure(
new Exception(
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getMessage()
@ -852,7 +910,7 @@ public class ShuffleClientImpl extends ShuffleClient {
synchronized (splittingSet) {
if (splittingSet.contains(partitionId)) {
logger.debug(
"shuffle {} partitionId {} is splitting, skip split request ", shuffleId, partitionId);
"Splitting for shuffle {} partition {}, skip split request.", shuffleId, partitionId);
return;
}
splittingSet.add(partitionId);
@ -981,6 +1039,7 @@ public class ShuffleClientImpl extends ShuffleClient {
pushState.addBatch(groupedBatchId, hostPort);
final int numBatches = batches.size();
final Integer[] partitionIds = new Integer[numBatches];
final String[] partitionUniqueIds = new String[numBatches];
final int[] offsets = new int[numBatches];
final int[] batchIds = new int[numBatches];
@ -988,6 +1047,7 @@ public class ShuffleClientImpl extends ShuffleClient {
CompositeByteBuf byteBuf = Unpooled.compositeBuffer();
for (int i = 0; i < numBatches; i++) {
DataBatches.DataBatch batch = batches.get(i);
partitionIds[i] = batch.loc.getId();
partitionUniqueIds[i] = batch.loc.getUniqueId();
offsets[i] = currentSize;
batchIds[i] = batch.batchId;
@ -1004,10 +1064,14 @@ public class ShuffleClientImpl extends ShuffleClient {
@Override
public void onSuccess(ByteBuffer response) {
logger.debug(
"Push data success for map {} attempt {} grouped batch {}.",
"Push merged data to {} success for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
hostPort,
shuffleId,
mapId,
attemptId,
groupedBatchId);
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds));
pushState.removeBatch(groupedBatchId, hostPort);
// TODO Need to adjust maxReqsInFlight if server response is congested, see CELEBORN-62
if (response.remaining() > 0 && response.get() == StatusCode.STAGE_ENDED.getValue()) {
@ -1022,21 +1086,30 @@ public class ShuffleClientImpl extends ShuffleClient {
String errorMsg =
(remainReviveTimes < maxReviveTimes ? "Revived push" : "Push")
+ " merged data to "
+ host
+ ":"
+ port
+ hostPort
+ " failed for map "
+ mapId
+ " attempt "
+ attemptId
+ " batches "
+ " partition "
+ Arrays.toString(partitionIds)
+ " groupedBatch "
+ groupedBatchId
+ " batch "
+ Arrays.toString(batchIds)
+ ".";
pushState.exception.compareAndSet(null, new CelebornIOException(errorMsg, e));
if (logger.isDebugEnabled()) {
for (int batchId : batchIds) {
for (int i = 0; i < numBatches; i++) {
logger.debug(
"Push data failed for map {} attempt {} batch {}.", mapId, attemptId, batchId);
"Push merged data to {} failed for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
hostPort,
shuffleId,
mapId,
attemptId,
partitionIds[i],
groupedBatchId,
batchIds[i]);
}
}
}
@ -1050,13 +1123,14 @@ public class ShuffleClientImpl extends ShuffleClient {
byte reason = response.get();
if (reason == StatusCode.HARD_SPLIT.getValue()) {
logger.info(
"Push merged data return hard split for map "
+ mapId
+ " attempt "
+ attemptId
+ " batches "
+ Arrays.toString(batchIds)
+ ".");
"Push merged data to {} hard split required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
hostPort,
shuffleId,
mapId,
attemptId,
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds));
pushDataRetryPool.submit(
() ->
submitRetryPushMergedData(
@ -1071,24 +1145,32 @@ public class ShuffleClientImpl extends ShuffleClient {
remainReviveTimes));
} else if (reason == StatusCode.PUSH_DATA_SUCCESS_MASTER_CONGESTED.getValue()) {
logger.debug(
"Push data split for map {} attempt {} batchs {} return master congested.",
"Push merged data to {} master congestion required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
hostPort,
shuffleId,
mapId,
attemptId,
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds));
pushState.onCongestControl(hostPort);
callback.onSuccess(response);
} else if (reason == StatusCode.PUSH_DATA_SUCCESS_SLAVE_CONGESTED.getValue()) {
logger.debug(
"Push data split for map {} attempt {} batchs {} return slave congested.",
"Push merged data to {} slave congestion required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
hostPort,
shuffleId,
mapId,
attemptId,
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds));
pushState.onCongestControl(hostPort);
callback.onSuccess(response);
} else {
// Should not happen in current architecture.
response.rewind();
logger.error("Push merged data should not receive this response");
logger.error("Push merged data should not receive this response.");
pushState.onSuccess(hostPort);
callback.onSuccess(response);
}
@ -1116,17 +1198,14 @@ public class ShuffleClientImpl extends ShuffleClient {
return;
}
logger.error(
"Push merged data to "
+ host
+ ":"
+ port
+ " failed for map "
+ mapId
+ " attempt "
+ attemptId
+ " batches "
+ Arrays.toString(batchIds)
+ ".",
"Push merged data to {} failed for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
hostPort,
shuffleId,
mapId,
attemptId,
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds),
e);
if (!mapperEnded(shuffleId, mapId, attemptId)) {
pushDataRetryPool.submit(
@ -1157,7 +1236,16 @@ public class ShuffleClientImpl extends ShuffleClient {
new RuntimeException("Mock push merge data failed.")));
}
} catch (Exception e) {
logger.warn("PushMergedData failed", e);
logger.error(
"Exception raised while pushing merged data for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {} location {}.",
shuffleId,
mapId,
attemptId,
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds),
hostPort,
e);
wrappedCallback.onFailure(
new Exception(
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getMessage()
@ -1236,7 +1324,7 @@ public class ShuffleClientImpl extends ShuffleClient {
} catch (Exception e) {
// If some exceptions need to be ignored, they shouldn't be logged as error-level,
// otherwise it will mislead users.
logger.warn("Send UnregisterShuffle failed, ignore.", e);
logger.error("Send UnregisterShuffle failed, ignore.", e);
}
}
@ -1277,26 +1365,26 @@ public class ShuffleClientImpl extends ShuffleClient {
if (response.status() == StatusCode.SUCCESS) {
logger.info(
"Shuffle {} request reducer file group success using time:{} ms, result partition ids: {}",
"Shuffle {} request reducer file group success using {} ms, result partition ids {}.",
shuffleId,
(System.nanoTime() - getReducerFileGroupStartTime) / 1000_000,
response.fileGroup().keySet());
return new ReduceFileGroups(response.fileGroup(), response.attempts());
} else if (response.status() == StatusCode.STAGE_END_TIME_OUT) {
logger.warn(
"Request {} return {} for {}",
"Request {} return {} for {}.",
getReducerFileGroup,
StatusCode.STAGE_END_TIME_OUT.toString(),
shuffleKey);
} else if (response.status() == StatusCode.SHUFFLE_DATA_LOST) {
logger.warn(
"Request {} return {} for {}",
"Request {} return {} for {}.",
getReducerFileGroup,
StatusCode.SHUFFLE_DATA_LOST.toString(),
shuffleKey);
}
} catch (Exception e) {
logger.error("Exception raised while call GetReducerFileGroup for " + shuffleKey + ".", e);
logger.error("Exception raised while call GetReducerFileGroup for {}.", shuffleKey, e);
}
return null;
}
@ -1312,7 +1400,8 @@ public class ShuffleClientImpl extends ShuffleClient {
String applicationId, String shuffleKey, int shuffleId, int partitionId) throws IOException {
ReduceFileGroups reduceFileGroups = updateFileGroup(applicationId, shuffleKey, shuffleId);
if (reduceFileGroups == null) {
String msg = "Shuffle data lost for shuffle " + shuffleId + " reduce " + partitionId + "!";
String msg =
"Shuffle data lost for shuffle " + shuffleId + " partitionId " + partitionId + "!";
logger.error(msg);
throw new CelebornIOException(msg);
}
@ -1333,7 +1422,7 @@ public class ShuffleClientImpl extends ShuffleClient {
if (fileGroups.partitionGroups.size() == 0
|| !fileGroups.partitionGroups.containsKey(partitionId)) {
logger.warn("Shuffle data is empty for shuffle {} partitionId {}.", shuffleId, partitionId);
logger.warn("Shuffle data is empty for shuffle {} partition {}.", shuffleId, partitionId);
return RssInputStream.empty();
} else {
return RssInputStream.create(
@ -1390,7 +1479,7 @@ public class ShuffleClientImpl extends ShuffleClient {
}
private StatusCode getPushDataFailCause(String message) {
logger.info("[getPushDataFailCause] message: " + message);
logger.debug("Push data failed cause message: " + message);
StatusCode cause;
if (message.startsWith(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage())) {
cause = StatusCode.PUSH_DATA_FAIL_SLAVE;
@ -1439,9 +1528,9 @@ public class ShuffleClientImpl extends ShuffleClient {
final String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
// return if shuffle stage already ended
if (mapperEnded(shuffleId, mapId, attemptId)) {
logger.debug(
"The mapper(shuffle {} map {} attempt {}) has already ended while"
+ " pushing data byteBuf.",
logger.info(
"Push data byteBuf to location {} ignored because mapper already ended for shuffle {} map {} attempt {}.",
location.hostAndPushPort(),
shuffleId,
mapId,
attemptId);
@ -1498,9 +1587,9 @@ public class ShuffleClientImpl extends ShuffleClient {
}
}
logger.debug(
"Push data byteBuf to {}:{} success for map {} attempt {} batch {}.",
location.getHost(),
location.getPushPort(),
"Push data byteBuf to {} success for shuffle {} map {} attemptId {} batch {}.",
location.hostAndPushPort(),
shuffleId,
mapId,
attemptId,
nextBatchId);
@ -1516,16 +1605,17 @@ public class ShuffleClientImpl extends ShuffleClient {
pushState.exception.compareAndSet(
null, new CelebornIOException("PushData byteBuf failed!", e));
logger.error(
"Push data byteBuf to {}:{} failed for map {} attempt {} batch {}.",
location.getHost(),
location.getPushPort(),
"Push data byteBuf to {} failed for shuffle {} map {} attempt {} batch {}.",
location.hostAndPushPort(),
shuffleId,
mapId,
attemptId,
nextBatchId,
e);
} else {
logger.warn(
"Mapper shuffleId:{} mapId:{} attempt:{} already ended, remove batchId:{}.",
"Push data to {} failed but mapper already ended for shuffle {} map {} attempt {} batch {}.",
location.hostAndPushPort(),
shuffleId,
mapId,
attemptId,
@ -1538,7 +1628,15 @@ public class ShuffleClientImpl extends ShuffleClient {
TransportClient client = createClientWaitingInFlightRequest(location, mapKey, pushState);
client.pushData(pushData, pushDataTimeout, callback, closeCallBack);
} catch (Exception e) {
logger.warn("PushData byteBuf failed", e);
logger.error(
"Exception raised while pushing data byteBuf for shuffle {} map {} attempt {} partitionId {} batch {} location {}.",
shuffleId,
mapId,
attemptId,
partitionId,
nextBatchId,
location,
e);
callback.onFailure(
new Exception(
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getMessage()
@ -1588,11 +1686,11 @@ public class ShuffleClientImpl extends ShuffleClient {
() -> {
String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
logger.info(
"pushDataHandShake shuffleKey:{}, attemptId:{}, locationId:{}",
"PushDataHandShake shuffleKey {} attemptId {} locationId {}",
shuffleKey,
attemptId,
location.getUniqueId());
logger.debug("pushDataHandShake location:{}", location.toString());
logger.debug("PushDataHandShake location {}", location.toString());
TransportClient client = createClientWaitingInFlightRequest(location, mapKey, pushState);
PushDataHandShake handShake =
new PushDataHandShake(
@ -1628,12 +1726,12 @@ public class ShuffleClientImpl extends ShuffleClient {
() -> {
String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
logger.info(
"regionStart regionId:{}, shuffleKey:{}, attemptId:{}, locationId:{}",
"RegionStart for shuffle {} regionId {} attemptId {} locationId {}.",
shuffleId,
currentRegionIdx,
shuffleKey,
attemptId,
location.getUniqueId());
logger.debug("regionStart location:{}", location.toString());
logger.debug("RegionStart for location {}.", location.toString());
TransportClient client = createClientWaitingInFlightRequest(location, mapKey, pushState);
RegionStart regionStart =
new RegionStart(
@ -1673,11 +1771,13 @@ public class ShuffleClientImpl extends ShuffleClient {
} else {
// throw exception
logger.error(
"Exception raised while reviving for shuffle {} reduce {} epoch {}.",
"Exception raised while reviving for shuffle {} map {} attemptId {} partition {} epoch {}.",
shuffleId,
mapId,
attemptId,
location.getId(),
location.getEpoch());
throw new CelebornIOException("regiontstart revive failed");
throw new CelebornIOException("RegionStart revive failed");
}
}
return Optional.empty();
@ -1699,11 +1799,12 @@ public class ShuffleClientImpl extends ShuffleClient {
() -> {
final String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
logger.info(
"regionFinish shuffleKey:{}, attemptId:{}, locationId:{}",
shuffleKey,
"RegionFinish for shuffle {} map {} attemptId {} locationId {}.",
shuffleId,
mapId,
attemptId,
location.getUniqueId());
logger.debug("regionFinish location:{}", location.toString());
logger.debug("RegionFinish for location {}.", location.toString());
TransportClient client = createClientWaitingInFlightRequest(location, mapKey, pushState);
RegionFinish regionFinish =
new RegionFinish(MASTER_MODE, shuffleKey, location.getUniqueId(), attemptId);
@ -1727,7 +1828,8 @@ public class ShuffleClientImpl extends ShuffleClient {
// return if shuffle stage already ended
if (mapperEnded(shuffleId, mapId, attemptId)) {
logger.debug(
"The mapper(shuffle {} map {} attempt {}) has already ended while" + " pushing data.",
"Send message to {} ignored because mapper already ended for shuffle {} map {} attempt {}.",
location.hostAndPushPort(),
shuffleId,
mapId,
attemptId);
@ -1763,7 +1865,7 @@ public class ShuffleClientImpl extends ShuffleClient {
while (!Thread.currentThread().isInterrupted()
&& !isSuccess
&& retryTimes < conf.networkIoMaxRetries(TransportModuleConstants.PUSH_MODULE)) {
logger.debug("retrySendMessage times: {}", retryTimes);
logger.debug("RetrySendMessage retry times {}.", retryTimes);
try {
result = supplier.get();
isSuccess = true;