diff --git a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java index b3292b600..ee48e3a30 100644 --- a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java +++ b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.client.ShuffleClient; import org.apache.celeborn.client.read.CelebornInputStream; +import org.apache.celeborn.client.read.MetricsCallback; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.reflect.DynConstructors; @@ -129,9 +130,23 @@ public class CelebornShuffleConsumer reduceId.getTaskID().getId(), reduceId.getId()); + MetricsCallback metricsCallback = + new MetricsCallback() { + @Override + public void incBytesRead(long bytesRead) {} + + @Override + public void incReadTime(long time) {} + }; + CelebornInputStream shuffleInputStream = shuffleClient.readPartition( - 0, reduceId.getTaskID().getId(), reduceId.getId(), 0, Integer.MAX_VALUE); + 0, + reduceId.getTaskID().getId(), + reduceId.getId(), + 0, + Integer.MAX_VALUE, + metricsCallback); CelebornShuffleFetcher shuffleReader = new CelebornShuffleFetcher( reduceId, taskStatus, merger, copyPhase, reporter, metrics, shuffleInputStream); diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index 22318e542..b4b566d1a 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -188,6 +188,7 @@ public abstract class ShuffleClient { * to read all partition data * @param endMapIndex the index of end map index of interested map range, set to * `Integer.MAX_VALUE` if you want to read all partition data + * @param metricsCallback callback to report metrics * @return * @throws IOException */