diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index 5b71e2f34..d485e8c49 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -187,11 +187,23 @@ public class ShuffleClientImpl extends ShuffleClient { new CelebornIOException("Revive Failed, remain revive times " + remainReviveTimes)); } else if (mapperEnded(shuffleId, mapId, attemptId)) { logger.debug( - "Retrying push data, but the mapper(map {} attempt {}) has ended.", mapId, attemptId); + "Revive for push data success, but the mapper already ended for shuffle {} map {} attempt {} partition {} batch {}.", + shuffleId, + mapId, + attemptId, + partitionId, + batchId); pushState.removeBatch(batchId, loc.hostAndPushPort()); } else { PartitionLocation newLoc = reducePartitionMap.get(shuffleId).get(partitionId); - logger.info("Revive success, new location for reduce {} is {}.", partitionId, newLoc); + logger.info( + "Revive for push data success, new location for shuffle {} map {} attempt {} partition {} batch {} is location {}.", + shuffleId, + mapId, + attemptId, + partitionId, + batchId, + newLoc); try { if (!testRetryRevive || remainReviveTimes < 1) { TransportClient client = @@ -207,12 +219,14 @@ public class ShuffleClientImpl extends ShuffleClient { "Mock push data submit retry failed. remainReviveTimes = " + remainReviveTimes + "."); } } catch (Exception e) { - logger.warn( - "Exception raised while pushing data for shuffle {} map {} attempt {}" + " batch {}.", + logger.error( + "Exception raised while pushing data for shuffle {} map {} attempt {} partition {} batch {} location {}.", shuffleId, mapId, attemptId, + partitionId, batchId, + newLoc, e); wrappedCallback.onFailure( new Exception( @@ -260,10 +274,22 @@ public class ShuffleClientImpl extends ShuffleClient { } } else if (mapperEnded(shuffleId, mapId, attemptId)) { logger.debug( - "Retrying push data, but the mapper(map {} attempt {}) has ended.", mapId, attemptId); + "Revive for push merged data success, but the mapper already ended for shuffle {} map {} attempt {} partition {} batch {}.", + shuffleId, + mapId, + attemptId, + partitionId, + oldGroupedBatchId); } else { PartitionLocation newLoc = reducePartitionMap.get(shuffleId).get(partitionId); - logger.info("Revive success, new location for reduce {} is {}.", partitionId, newLoc); + logger.info( + "Revive for push merged data success, new location for shuffle {} map {} attempt {} partition {} batch {} is location {}.", + shuffleId, + mapId, + attemptId, + partitionId, + oldGroupedBatchId, + newLoc); DataBatches newDataBatches = newDataBatchesMap.computeIfAbsent(genAddressPair(newLoc), (s) -> new DataBatches()); newDataBatches.addDataBatch(newLoc, batch.batchId, batch.body); @@ -330,10 +356,12 @@ public class ShuffleClientImpl extends ShuffleClient { String appId, int shuffleId, int numMappers, int mapId, int attemptId) throws IOException { int partitionId = PackedPartitionId.packedPartitionId(mapId, attemptId); logger.info( - "register mapPartitionTask, mapId: {}, attemptId: {}, partitionId: {}", + "Register MapPartition task for shuffle {} map {} attempt {} partition {} with {} mapper.", + shuffleId, mapId, attemptId, - partitionId); + partitionId, + numMappers); ConcurrentHashMap partitionLocationMap = registerShuffleInternal( shuffleId, @@ -386,17 +414,17 @@ public class ShuffleClientImpl extends ShuffleClient { return result; } else if (StatusCode.SLOT_NOT_AVAILABLE.equals(respStatus)) { logger.error( - "LifecycleManager request slots return {}, retry again, remain retry times {}", + "LifecycleManager request slots return {}, retry again, remain retry times {}.", StatusCode.SLOT_NOT_AVAILABLE, numRetries - 1); } else if (StatusCode.RESERVE_SLOTS_FAILED.equals(respStatus)) { logger.error( - "LifecycleManager request slots return {}, retry again, remain retry times {}", + "LifecycleManager request slots return {}, retry again, remain retry times {}.", StatusCode.RESERVE_SLOTS_FAILED, numRetries - 1); } else { logger.error( - "LifecycleManager request slots return {}, retry again, remain retry times {}", + "LifecycleManager request slots return {}, retry again, remain retry times {}.", StatusCode.REQUEST_FAILED, numRetries - 1); } @@ -426,7 +454,8 @@ public class ShuffleClientImpl extends ShuffleClient { boolean reachLimit = pushState.limitMaxInFlight(hostAndPushPort); if (reachLimit) { - throw new CelebornIOException("wait timeout for task " + mapKey, pushState.exception.get()); + throw new CelebornIOException( + "Waiting timeout for task " + mapKey, pushState.exception.get()); } } @@ -434,7 +463,8 @@ public class ShuffleClientImpl extends ShuffleClient { boolean reachLimit = pushState.limitZeroInFlight(); if (reachLimit) { - throw new CelebornIOException("wait timeout for task " + mapKey, pushState.exception.get()); + throw new CelebornIOException( + "Waiting timeout for task " + mapKey, pushState.exception.get()); } } @@ -450,7 +480,7 @@ public class ShuffleClientImpl extends ShuffleClient { try { TimeUnit.MILLISECONDS.sleep(sleepTimeMs); } catch (InterruptedException e) { - logger.warn("Wait revived location interrupted", e); + logger.error("Waiting revived location was interrupted.", e); Thread.currentThread().interrupt(); } } @@ -471,10 +501,10 @@ public class ShuffleClientImpl extends ShuffleClient { ConcurrentHashMap map = reducePartitionMap.get(shuffleId); if (waitRevivedLocation(map, partitionId, epoch)) { logger.debug( - "Has already revived for shuffle {} map {} reduce {} epoch {}," - + " just return(Assume revive successfully).", + "Revive already success for shuffle {} map {} attempt {} partition {} epoch {}, just return true(Assume revive successfully).", shuffleId, mapId, + attemptId, partitionId, epoch); return true; @@ -482,10 +512,11 @@ public class ShuffleClientImpl extends ShuffleClient { String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId); if (mapperEnded(shuffleId, mapId, attemptId)) { logger.debug( - "The mapper(shuffle {} map {}) has already ended, just return(Assume" - + " revive successfully).", + "Revive success, but the mapper ended for shuffle {} map {} attempt {} partition {}, just return true(Assume revive successfully).", shuffleId, - mapId); + mapId, + attemptId, + partitionId); return true; } @@ -509,6 +540,12 @@ public class ShuffleClientImpl extends ShuffleClient { map.put(partitionId, PbSerDeUtils.fromPbPartitionLocation(response.getLocation())); return true; } else if (StatusCode.MAP_ENDED.equals(respStatus)) { + logger.debug( + "Revive success, but the mapper ended for shuffle {} map {} attempt {} partition {}, just return true(Assume revive successfully).", + shuffleId, + mapId, + attemptId, + partitionId); mapperEndMap.computeIfAbsent(shuffleId, (id) -> ConcurrentHashMap.newKeySet()).add(mapKey); return true; } else { @@ -516,8 +553,10 @@ public class ShuffleClientImpl extends ShuffleClient { } } catch (Exception e) { logger.error( - "Exception raised while reviving for shuffle {} reduce {} epoch {}.", + "Exception raised while reviving for shuffle {} map {} attempt {} partition {} epoch {}.", shuffleId, + mapId, + attemptId, partitionId, epoch, e); @@ -544,10 +583,11 @@ public class ShuffleClientImpl extends ShuffleClient { // return if shuffle stage already ended if (mapperEnded(shuffleId, mapId, attemptId)) { logger.debug( - "The mapper(shuffle {} map {} attempt {}) has already ended while" + " pushing data.", + "Push or merge data ignored because mapper already ended for shuffle {} map {} attempt {} partition {}.", shuffleId, mapId, - attemptId); + attemptId, + partitionId); PushState pushState = pushStates.get(mapKey); if (pushState != null) { pushState.cleanup(); @@ -559,7 +599,7 @@ public class ShuffleClientImpl extends ShuffleClient { getPartitionLocation(applicationId, shuffleId, numMappers, numPartitions); if (map == null) { - throw new CelebornIOException("Register shuffle failed for shuffle " + shuffleKey); + throw new CelebornIOException("Register shuffle failed for shuffle " + shuffleKey + "."); } // get location @@ -575,16 +615,17 @@ public class ShuffleClientImpl extends ShuffleClient { null, StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE)) { throw new CelebornIOException( - "Revive for shuffle " + shuffleKey + " partitionId " + partitionId + " failed."); + "Revive for shuffle " + shuffleKey + " partition " + partitionId + " failed."); } } if (mapperEnded(shuffleId, mapId, attemptId)) { logger.debug( - "The mapper(shuffle {} map {} attempt {}) has already ended while" + " pushing data.", + "Push or merge data ignored because mapper already ended for shuffle {} map {} attempt {} partition {}.", shuffleId, mapId, - attemptId); + attemptId, + partitionId); PushState pushState = pushStates.get(mapKey); if (pushState != null) { pushState.cleanup(); @@ -622,14 +663,6 @@ public class ShuffleClientImpl extends ShuffleClient { compressor.getCompressedBuffer(), 0, body, BATCH_HEADER_SIZE, compressedTotalSize); if (doPush) { - logger.debug( - "Do push data for app {} shuffle {} map {} attempt {} reduce {} batch {}.", - applicationId, - shuffleId, - mapId, - attemptId, - partitionId, - nextBatchId); // check limit limitMaxInFlight(mapKey, pushState, loc.hostAndPushPort()); @@ -654,11 +687,12 @@ public class ShuffleClientImpl extends ShuffleClient { .add(mapKey); } logger.debug( - "Push data to {}:{} success for map {} attempt {} batch {}.", - loc.getHost(), - loc.getPushPort(), + "Push data to {} success for shuffle {} map {} attempt {} partition {} batch {}.", + loc.hostAndPushPort(), + shuffleId, mapId, attemptId, + partitionId, nextBatchId); } @@ -667,11 +701,12 @@ public class ShuffleClientImpl extends ShuffleClient { pushState.exception.compareAndSet( null, new CelebornIOException("Revived PushData failed!", e)); logger.error( - "Push data to {}:{} failed for map {} attempt {} batch {}.", - loc.getHost(), - loc.getPushPort(), + "Push data to {} failed for shuffle {} map {} attempt {} partition {} batch {}.", + loc.hostAndPushPort(), + shuffleId, mapId, attemptId, + partitionId, nextBatchId, e); } @@ -687,18 +722,24 @@ public class ShuffleClientImpl extends ShuffleClient { byte reason = response.get(); if (reason == StatusCode.SOFT_SPLIT.getValue()) { logger.debug( - "Push data split required for map {} attempt {} batch {}", + "Push data to {} soft split required for shuffle {} map {} attempt {} partition {} batch {}.", + loc.hostAndPushPort(), + shuffleId, mapId, attemptId, + partitionId, nextBatchId); splitPartition(shuffleId, partitionId, applicationId, loc); pushState.onSuccess(loc.hostAndPushPort()); callback.onSuccess(response); } else if (reason == StatusCode.HARD_SPLIT.getValue()) { logger.debug( - "Push data split for map {} attempt {} batch {}.", + "Push data to {} hard split required for shuffle {} map {} attempt {} partition {} batch {}.", + loc.hostAndPushPort(), + shuffleId, mapId, attemptId, + partitionId, nextBatchId); pushDataRetryPool.submit( () -> @@ -716,17 +757,23 @@ public class ShuffleClientImpl extends ShuffleClient { remainReviveTimes)); } else if (reason == StatusCode.PUSH_DATA_SUCCESS_MASTER_CONGESTED.getValue()) { logger.debug( - "Push data split for map {} attempt {} batch {} return master congested.", + "Push data to {} master congestion required for shuffle {} map {} attempt {} partition {} batch {}.", + loc.hostAndPushPort(), + shuffleId, mapId, attemptId, + partitionId, nextBatchId); pushState.onCongestControl(loc.hostAndPushPort()); callback.onSuccess(response); } else if (reason == StatusCode.PUSH_DATA_SUCCESS_SLAVE_CONGESTED.getValue()) { logger.debug( - "Push data split for map {} attempt {} batch {} return slave congested.", + "Push data to {} slave congestion required for shuffle {} map {} attempt {} partition {} batch {}.", + loc.hostAndPushPort(), + shuffleId, mapId, attemptId, + partitionId, nextBatchId); pushState.onCongestControl(loc.hostAndPushPort()); callback.onSuccess(response); @@ -763,11 +810,12 @@ public class ShuffleClientImpl extends ShuffleClient { } logger.error( - "Push data to {}:{} failed for map {} attempt {} batch {}.", - loc.getHost(), - loc.getPushPort(), + "Push data to {} failed for shuffle {} map {} attempt {} partition {} batch {}.", + loc.hostAndPushPort(), + shuffleId, mapId, attemptId, + partitionId, nextBatchId, e); // async retry push data @@ -790,10 +838,12 @@ public class ShuffleClientImpl extends ShuffleClient { } else { pushState.removeBatch(nextBatchId, loc.hostAndPushPort()); logger.info( - "Mapper shuffleId:{} mapId:{} attempt:{} already ended, remove batchId:{}.", + "Push data to {} failed but mapper already ended for shuffle {} map {} attempt {} partition {} batch {}.", + loc.hostAndPushPort(), shuffleId, mapId, attemptId, + partitionId, nextBatchId); } } @@ -812,7 +862,15 @@ public class ShuffleClientImpl extends ShuffleClient { new RuntimeException("Mock push data first time failed."))); } } catch (Exception e) { - logger.warn("PushData failed", e); + logger.error( + "Exception raised while pushing data for shuffle {} map {} attempt {} partition {} batch {} location {}.", + shuffleId, + mapId, + attemptId, + partitionId, + nextBatchId, + loc, + e); wrappedCallback.onFailure( new Exception( StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getMessage() @@ -852,7 +910,7 @@ public class ShuffleClientImpl extends ShuffleClient { synchronized (splittingSet) { if (splittingSet.contains(partitionId)) { logger.debug( - "shuffle {} partitionId {} is splitting, skip split request ", shuffleId, partitionId); + "Splitting for shuffle {} partition {}, skip split request.", shuffleId, partitionId); return; } splittingSet.add(partitionId); @@ -981,6 +1039,7 @@ public class ShuffleClientImpl extends ShuffleClient { pushState.addBatch(groupedBatchId, hostPort); final int numBatches = batches.size(); + final Integer[] partitionIds = new Integer[numBatches]; final String[] partitionUniqueIds = new String[numBatches]; final int[] offsets = new int[numBatches]; final int[] batchIds = new int[numBatches]; @@ -988,6 +1047,7 @@ public class ShuffleClientImpl extends ShuffleClient { CompositeByteBuf byteBuf = Unpooled.compositeBuffer(); for (int i = 0; i < numBatches; i++) { DataBatches.DataBatch batch = batches.get(i); + partitionIds[i] = batch.loc.getId(); partitionUniqueIds[i] = batch.loc.getUniqueId(); offsets[i] = currentSize; batchIds[i] = batch.batchId; @@ -1004,10 +1064,14 @@ public class ShuffleClientImpl extends ShuffleClient { @Override public void onSuccess(ByteBuffer response) { logger.debug( - "Push data success for map {} attempt {} grouped batch {}.", + "Push merged data to {} success for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.", + hostPort, + shuffleId, mapId, attemptId, - groupedBatchId); + Arrays.toString(partitionIds), + groupedBatchId, + Arrays.toString(batchIds)); pushState.removeBatch(groupedBatchId, hostPort); // TODO Need to adjust maxReqsInFlight if server response is congested, see CELEBORN-62 if (response.remaining() > 0 && response.get() == StatusCode.STAGE_ENDED.getValue()) { @@ -1022,21 +1086,30 @@ public class ShuffleClientImpl extends ShuffleClient { String errorMsg = (remainReviveTimes < maxReviveTimes ? "Revived push" : "Push") + " merged data to " - + host - + ":" - + port + + hostPort + " failed for map " + mapId + " attempt " + attemptId - + " batches " + + " partition " + + Arrays.toString(partitionIds) + + " groupedBatch " + + groupedBatchId + + " batch " + Arrays.toString(batchIds) + "."; pushState.exception.compareAndSet(null, new CelebornIOException(errorMsg, e)); if (logger.isDebugEnabled()) { - for (int batchId : batchIds) { + for (int i = 0; i < numBatches; i++) { logger.debug( - "Push data failed for map {} attempt {} batch {}.", mapId, attemptId, batchId); + "Push merged data to {} failed for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.", + hostPort, + shuffleId, + mapId, + attemptId, + partitionIds[i], + groupedBatchId, + batchIds[i]); } } } @@ -1050,13 +1123,14 @@ public class ShuffleClientImpl extends ShuffleClient { byte reason = response.get(); if (reason == StatusCode.HARD_SPLIT.getValue()) { logger.info( - "Push merged data return hard split for map " - + mapId - + " attempt " - + attemptId - + " batches " - + Arrays.toString(batchIds) - + "."); + "Push merged data to {} hard split required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.", + hostPort, + shuffleId, + mapId, + attemptId, + Arrays.toString(partitionIds), + groupedBatchId, + Arrays.toString(batchIds)); pushDataRetryPool.submit( () -> submitRetryPushMergedData( @@ -1071,24 +1145,32 @@ public class ShuffleClientImpl extends ShuffleClient { remainReviveTimes)); } else if (reason == StatusCode.PUSH_DATA_SUCCESS_MASTER_CONGESTED.getValue()) { logger.debug( - "Push data split for map {} attempt {} batchs {} return master congested.", + "Push merged data to {} master congestion required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.", + hostPort, + shuffleId, mapId, attemptId, + Arrays.toString(partitionIds), + groupedBatchId, Arrays.toString(batchIds)); pushState.onCongestControl(hostPort); callback.onSuccess(response); } else if (reason == StatusCode.PUSH_DATA_SUCCESS_SLAVE_CONGESTED.getValue()) { logger.debug( - "Push data split for map {} attempt {} batchs {} return slave congested.", + "Push merged data to {} slave congestion required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.", + hostPort, + shuffleId, mapId, attemptId, + Arrays.toString(partitionIds), + groupedBatchId, Arrays.toString(batchIds)); pushState.onCongestControl(hostPort); callback.onSuccess(response); } else { // Should not happen in current architecture. response.rewind(); - logger.error("Push merged data should not receive this response"); + logger.error("Push merged data should not receive this response."); pushState.onSuccess(hostPort); callback.onSuccess(response); } @@ -1116,17 +1198,14 @@ public class ShuffleClientImpl extends ShuffleClient { return; } logger.error( - "Push merged data to " - + host - + ":" - + port - + " failed for map " - + mapId - + " attempt " - + attemptId - + " batches " - + Arrays.toString(batchIds) - + ".", + "Push merged data to {} failed for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.", + hostPort, + shuffleId, + mapId, + attemptId, + Arrays.toString(partitionIds), + groupedBatchId, + Arrays.toString(batchIds), e); if (!mapperEnded(shuffleId, mapId, attemptId)) { pushDataRetryPool.submit( @@ -1157,7 +1236,16 @@ public class ShuffleClientImpl extends ShuffleClient { new RuntimeException("Mock push merge data failed."))); } } catch (Exception e) { - logger.warn("PushMergedData failed", e); + logger.error( + "Exception raised while pushing merged data for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {} location {}.", + shuffleId, + mapId, + attemptId, + Arrays.toString(partitionIds), + groupedBatchId, + Arrays.toString(batchIds), + hostPort, + e); wrappedCallback.onFailure( new Exception( StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getMessage() @@ -1236,7 +1324,7 @@ public class ShuffleClientImpl extends ShuffleClient { } catch (Exception e) { // If some exceptions need to be ignored, they shouldn't be logged as error-level, // otherwise it will mislead users. - logger.warn("Send UnregisterShuffle failed, ignore.", e); + logger.error("Send UnregisterShuffle failed, ignore.", e); } } @@ -1277,26 +1365,26 @@ public class ShuffleClientImpl extends ShuffleClient { if (response.status() == StatusCode.SUCCESS) { logger.info( - "Shuffle {} request reducer file group success using time:{} ms, result partition ids: {}", + "Shuffle {} request reducer file group success using {} ms, result partition ids {}.", shuffleId, (System.nanoTime() - getReducerFileGroupStartTime) / 1000_000, response.fileGroup().keySet()); return new ReduceFileGroups(response.fileGroup(), response.attempts()); } else if (response.status() == StatusCode.STAGE_END_TIME_OUT) { logger.warn( - "Request {} return {} for {}", + "Request {} return {} for {}.", getReducerFileGroup, StatusCode.STAGE_END_TIME_OUT.toString(), shuffleKey); } else if (response.status() == StatusCode.SHUFFLE_DATA_LOST) { logger.warn( - "Request {} return {} for {}", + "Request {} return {} for {}.", getReducerFileGroup, StatusCode.SHUFFLE_DATA_LOST.toString(), shuffleKey); } } catch (Exception e) { - logger.error("Exception raised while call GetReducerFileGroup for " + shuffleKey + ".", e); + logger.error("Exception raised while call GetReducerFileGroup for {}.", shuffleKey, e); } return null; } @@ -1312,7 +1400,8 @@ public class ShuffleClientImpl extends ShuffleClient { String applicationId, String shuffleKey, int shuffleId, int partitionId) throws IOException { ReduceFileGroups reduceFileGroups = updateFileGroup(applicationId, shuffleKey, shuffleId); if (reduceFileGroups == null) { - String msg = "Shuffle data lost for shuffle " + shuffleId + " reduce " + partitionId + "!"; + String msg = + "Shuffle data lost for shuffle " + shuffleId + " partitionId " + partitionId + "!"; logger.error(msg); throw new CelebornIOException(msg); } @@ -1333,7 +1422,7 @@ public class ShuffleClientImpl extends ShuffleClient { if (fileGroups.partitionGroups.size() == 0 || !fileGroups.partitionGroups.containsKey(partitionId)) { - logger.warn("Shuffle data is empty for shuffle {} partitionId {}.", shuffleId, partitionId); + logger.warn("Shuffle data is empty for shuffle {} partition {}.", shuffleId, partitionId); return RssInputStream.empty(); } else { return RssInputStream.create( @@ -1390,7 +1479,7 @@ public class ShuffleClientImpl extends ShuffleClient { } private StatusCode getPushDataFailCause(String message) { - logger.info("[getPushDataFailCause] message: " + message); + logger.debug("Push data failed cause message: " + message); StatusCode cause; if (message.startsWith(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage())) { cause = StatusCode.PUSH_DATA_FAIL_SLAVE; @@ -1439,9 +1528,9 @@ public class ShuffleClientImpl extends ShuffleClient { final String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId); // return if shuffle stage already ended if (mapperEnded(shuffleId, mapId, attemptId)) { - logger.debug( - "The mapper(shuffle {} map {} attempt {}) has already ended while" - + " pushing data byteBuf.", + logger.info( + "Push data byteBuf to location {} ignored because mapper already ended for shuffle {} map {} attempt {}.", + location.hostAndPushPort(), shuffleId, mapId, attemptId); @@ -1498,9 +1587,9 @@ public class ShuffleClientImpl extends ShuffleClient { } } logger.debug( - "Push data byteBuf to {}:{} success for map {} attempt {} batch {}.", - location.getHost(), - location.getPushPort(), + "Push data byteBuf to {} success for shuffle {} map {} attemptId {} batch {}.", + location.hostAndPushPort(), + shuffleId, mapId, attemptId, nextBatchId); @@ -1516,16 +1605,17 @@ public class ShuffleClientImpl extends ShuffleClient { pushState.exception.compareAndSet( null, new CelebornIOException("PushData byteBuf failed!", e)); logger.error( - "Push data byteBuf to {}:{} failed for map {} attempt {} batch {}.", - location.getHost(), - location.getPushPort(), + "Push data byteBuf to {} failed for shuffle {} map {} attempt {} batch {}.", + location.hostAndPushPort(), + shuffleId, mapId, attemptId, nextBatchId, e); } else { logger.warn( - "Mapper shuffleId:{} mapId:{} attempt:{} already ended, remove batchId:{}.", + "Push data to {} failed but mapper already ended for shuffle {} map {} attempt {} batch {}.", + location.hostAndPushPort(), shuffleId, mapId, attemptId, @@ -1538,7 +1628,15 @@ public class ShuffleClientImpl extends ShuffleClient { TransportClient client = createClientWaitingInFlightRequest(location, mapKey, pushState); client.pushData(pushData, pushDataTimeout, callback, closeCallBack); } catch (Exception e) { - logger.warn("PushData byteBuf failed", e); + logger.error( + "Exception raised while pushing data byteBuf for shuffle {} map {} attempt {} partitionId {} batch {} location {}.", + shuffleId, + mapId, + attemptId, + partitionId, + nextBatchId, + location, + e); callback.onFailure( new Exception( StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER.getMessage() @@ -1588,11 +1686,11 @@ public class ShuffleClientImpl extends ShuffleClient { () -> { String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId); logger.info( - "pushDataHandShake shuffleKey:{}, attemptId:{}, locationId:{}", + "PushDataHandShake shuffleKey {} attemptId {} locationId {}", shuffleKey, attemptId, location.getUniqueId()); - logger.debug("pushDataHandShake location:{}", location.toString()); + logger.debug("PushDataHandShake location {}", location.toString()); TransportClient client = createClientWaitingInFlightRequest(location, mapKey, pushState); PushDataHandShake handShake = new PushDataHandShake( @@ -1628,12 +1726,12 @@ public class ShuffleClientImpl extends ShuffleClient { () -> { String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId); logger.info( - "regionStart regionId:{}, shuffleKey:{}, attemptId:{}, locationId:{}", + "RegionStart for shuffle {} regionId {} attemptId {} locationId {}.", + shuffleId, currentRegionIdx, - shuffleKey, attemptId, location.getUniqueId()); - logger.debug("regionStart location:{}", location.toString()); + logger.debug("RegionStart for location {}.", location.toString()); TransportClient client = createClientWaitingInFlightRequest(location, mapKey, pushState); RegionStart regionStart = new RegionStart( @@ -1673,11 +1771,13 @@ public class ShuffleClientImpl extends ShuffleClient { } else { // throw exception logger.error( - "Exception raised while reviving for shuffle {} reduce {} epoch {}.", + "Exception raised while reviving for shuffle {} map {} attemptId {} partition {} epoch {}.", shuffleId, + mapId, + attemptId, location.getId(), location.getEpoch()); - throw new CelebornIOException("regiontstart revive failed"); + throw new CelebornIOException("RegionStart revive failed"); } } return Optional.empty(); @@ -1699,11 +1799,12 @@ public class ShuffleClientImpl extends ShuffleClient { () -> { final String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId); logger.info( - "regionFinish shuffleKey:{}, attemptId:{}, locationId:{}", - shuffleKey, + "RegionFinish for shuffle {} map {} attemptId {} locationId {}.", + shuffleId, + mapId, attemptId, location.getUniqueId()); - logger.debug("regionFinish location:{}", location.toString()); + logger.debug("RegionFinish for location {}.", location.toString()); TransportClient client = createClientWaitingInFlightRequest(location, mapKey, pushState); RegionFinish regionFinish = new RegionFinish(MASTER_MODE, shuffleKey, location.getUniqueId(), attemptId); @@ -1727,7 +1828,8 @@ public class ShuffleClientImpl extends ShuffleClient { // return if shuffle stage already ended if (mapperEnded(shuffleId, mapId, attemptId)) { logger.debug( - "The mapper(shuffle {} map {} attempt {}) has already ended while" + " pushing data.", + "Send message to {} ignored because mapper already ended for shuffle {} map {} attempt {}.", + location.hostAndPushPort(), shuffleId, mapId, attemptId); @@ -1763,7 +1865,7 @@ public class ShuffleClientImpl extends ShuffleClient { while (!Thread.currentThread().isInterrupted() && !isSuccess && retryTimes < conf.networkIoMaxRetries(TransportModuleConstants.PUSH_MODULE)) { - logger.debug("retrySendMessage times: {}", retryTimes); + logger.debug("RetrySendMessage retry times {}.", retryTimes); try { result = supplier.get(); isSuccess = true;