From 4465a9229b94ab683b8fb5b6e4e13aafd5996b38 Mon Sep 17 00:00:00 2001 From: sychen Date: Sat, 4 Nov 2023 20:21:47 +0800 Subject: [PATCH] [CELEBORN-1048][FOLLOWUP] MR module compile ### What changes were proposed in this pull request? Let the MR module compile successfully. ### Why are the changes needed? #2000 added parameters in the `ShuffleClient#readPartition` method, resulting in MR module compilation failure. MR CI is still missing. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? local test ```bash ./build/make-distribution.sh -Pmr ``` Closes #2069 from cxzl25/CELEBORN-1048-FOLLOWUP. Authored-by: sychen Signed-off-by: zky.zhoukeyong --- .../task/reduce/CelebornShuffleConsumer.java | 17 ++++++++++++++++- .../apache/celeborn/client/ShuffleClient.java | 1 + 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java index b3292b600..ee48e3a30 100644 --- a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java +++ b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.client.ShuffleClient; import org.apache.celeborn.client.read.CelebornInputStream; +import org.apache.celeborn.client.read.MetricsCallback; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.reflect.DynConstructors; @@ -129,9 +130,23 @@ public class CelebornShuffleConsumer reduceId.getTaskID().getId(), reduceId.getId()); + MetricsCallback metricsCallback = + new MetricsCallback() { + @Override + public void incBytesRead(long bytesRead) {} + + @Override + public void incReadTime(long time) {} + }; + CelebornInputStream shuffleInputStream = shuffleClient.readPartition( - 0, reduceId.getTaskID().getId(), reduceId.getId(), 0, Integer.MAX_VALUE); + 0, + reduceId.getTaskID().getId(), + reduceId.getId(), + 0, + Integer.MAX_VALUE, + metricsCallback); CelebornShuffleFetcher shuffleReader = new CelebornShuffleFetcher( reduceId, taskStatus, merger, copyPhase, reporter, metrics, shuffleInputStream); diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index 22318e542..b4b566d1a 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -188,6 +188,7 @@ public abstract class ShuffleClient { * to read all partition data * @param endMapIndex the index of end map index of interested map range, set to * `Integer.MAX_VALUE` if you want to read all partition data + * @param metricsCallback callback to report metrics * @return * @throws IOException */