diff --git a/client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala b/client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala index 18abc5e3d..e6f3cdccb 100644 --- a/client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala +++ b/client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala @@ -27,19 +27,19 @@ import org.apache.celeborn.common.CelebornConf class CelebornColumnarShuffleReader[K, C]( handle: CelebornShuffleHandle[K, _, C], - startMapIndex: Int = 0, - endMapIndex: Int = Int.MaxValue, startPartition: Int, endPartition: Int, + startMapIndex: Int = 0, + endMapIndex: Int = Int.MaxValue, context: TaskContext, conf: CelebornConf, metrics: ShuffleReadMetricsReporter) extends CelebornShuffleReader[K, C]( handle, - startMapIndex, - endMapIndex, startPartition, endPartition, + startMapIndex, + endMapIndex, context, conf, metrics) { diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java index 49e3a950c..603cae517 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java @@ -264,15 +264,15 @@ public class SparkShuffleManager implements ShuffleManager { ShuffleReadMetricsReporter metrics) { if (handle instanceof CelebornShuffleHandle) { return getCelebornShuffleReader( - handle, startMapIndex, endMapIndex, startPartition, endPartition, context, metrics); + handle, startPartition, endPartition, startMapIndex, endMapIndex, context, metrics); } return SparkUtils.getReader( sortShuffleManager(), handle, - startMapIndex, - endMapIndex, startPartition, endPartition, + startMapIndex, + endMapIndex, context, metrics); } @@ -286,15 +286,15 @@ public class SparkShuffleManager implements ShuffleManager { ShuffleReadMetricsReporter metrics) { if (handle instanceof CelebornShuffleHandle) { return getCelebornShuffleReader( - handle, 0, Integer.MAX_VALUE, startPartition, endPartition, context, metrics); + handle, startPartition, endPartition, 0, Integer.MAX_VALUE, context, metrics); } return SparkUtils.getReader( sortShuffleManager(), handle, - 0, - Integer.MAX_VALUE, startPartition, endPartition, + 0, + Integer.MAX_VALUE, context, metrics); } @@ -310,45 +310,45 @@ public class SparkShuffleManager implements ShuffleManager { ShuffleReadMetricsReporter metrics) { if (handle instanceof CelebornShuffleHandle) { return getCelebornShuffleReader( - handle, startMapIndex, endMapIndex, startPartition, endPartition, context, metrics); + handle, startPartition, endPartition, startMapIndex, endMapIndex, context, metrics); } return SparkUtils.getReader( sortShuffleManager(), handle, - startMapIndex, - endMapIndex, startPartition, endPartition, + startMapIndex, + endMapIndex, context, metrics); } public ShuffleReader getCelebornShuffleReader( ShuffleHandle handle, - int startMapIndex, - int endMapIndex, int startPartition, int endPartition, + int startMapIndex, + int endMapIndex, TaskContext context, ShuffleReadMetricsReporter metrics) { CelebornShuffleHandle h = (CelebornShuffleHandle) handle; if (COLUMNAR_SHUFFLE_CLASSES_PRESENT && celebornConf.columnarShuffleEnabled()) { return SparkUtils.createColumnarShuffleReader( h, - startMapIndex, - endMapIndex, startPartition, endPartition, + startMapIndex, + endMapIndex, context, celebornConf, metrics); } else { return new CelebornShuffleReader<>( h, - startMapIndex, - endMapIndex, startPartition, endPartition, + startMapIndex, + endMapIndex, context, celebornConf, metrics); diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java index fd9ce1dd5..46a54f2ff 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java @@ -135,10 +135,10 @@ public class SparkUtils { public static ShuffleReader getReader( SortShuffleManager sortShuffleManager, ShuffleHandle handle, - Integer startMapIndex, - Integer endMapIndex, Integer startPartition, Integer endPartition, + Integer startMapIndex, + Integer endMapIndex, TaskContext context, ShuffleReadMetricsReporter metrics) { ShuffleReader shuffleReader = @@ -200,10 +200,10 @@ public class SparkUtils { public static CelebornShuffleReader createColumnarShuffleReader( CelebornShuffleHandle handle, - int startMapIndex, - int endMapIndex, int startPartition, int endPartition, + int startMapIndex, + int endMapIndex, TaskContext context, CelebornConf conf, ShuffleReadMetricsReporter metrics) { @@ -212,10 +212,10 @@ public class SparkUtils { .invoke( null, handle, - startMapIndex, - endMapIndex, startPartition, endPartition, + startMapIndex, + endMapIndex, context, conf, metrics); diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala index cb76ed015..eadb655c9 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala @@ -30,10 +30,10 @@ import org.apache.celeborn.common.CelebornConf class CelebornShuffleReader[K, C]( handle: CelebornShuffleHandle[K, _, C], - startMapIndex: Int = 0, - endMapIndex: Int = Int.MaxValue, startPartition: Int, endPartition: Int, + startMapIndex: Int = 0, + endMapIndex: Int = Int.MaxValue, context: TaskContext, conf: CelebornConf, metrics: ShuffleReadMetricsReporter)