[CELEBORN-919][FOLLOWUP] Put map index args after partition index args in CelebornShuffleReader constructor

### What changes were proposed in this pull request?

Put map index args after partition index args in CelebornShuffleReader constructor

### Why are the changes needed?

#1853 changed the args order in CelebornShuffleReader constructor. It will break gluten celeborn shuffle manager.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Run test locally.

Closes #1869 from zhouyifan279/shuffle-reader-ctor.

Authored-by: zhouyifan279 <zhouyifan279@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
zhouyifan279 2023-08-31 17:22:10 +08:00 committed by zky.zhoukeyong
parent 194f0fcf09
commit 3bad1c8abc
4 changed files with 27 additions and 27 deletions

View File

@ -27,19 +27,19 @@ import org.apache.celeborn.common.CelebornConf
class CelebornColumnarShuffleReader[K, C](
handle: CelebornShuffleHandle[K, _, C],
startMapIndex: Int = 0,
endMapIndex: Int = Int.MaxValue,
startPartition: Int,
endPartition: Int,
startMapIndex: Int = 0,
endMapIndex: Int = Int.MaxValue,
context: TaskContext,
conf: CelebornConf,
metrics: ShuffleReadMetricsReporter)
extends CelebornShuffleReader[K, C](
handle,
startMapIndex,
endMapIndex,
startPartition,
endPartition,
startMapIndex,
endMapIndex,
context,
conf,
metrics) {

View File

@ -264,15 +264,15 @@ public class SparkShuffleManager implements ShuffleManager {
ShuffleReadMetricsReporter metrics) {
if (handle instanceof CelebornShuffleHandle) {
return getCelebornShuffleReader(
handle, startMapIndex, endMapIndex, startPartition, endPartition, context, metrics);
handle, startPartition, endPartition, startMapIndex, endMapIndex, context, metrics);
}
return SparkUtils.getReader(
sortShuffleManager(),
handle,
startMapIndex,
endMapIndex,
startPartition,
endPartition,
startMapIndex,
endMapIndex,
context,
metrics);
}
@ -286,15 +286,15 @@ public class SparkShuffleManager implements ShuffleManager {
ShuffleReadMetricsReporter metrics) {
if (handle instanceof CelebornShuffleHandle) {
return getCelebornShuffleReader(
handle, 0, Integer.MAX_VALUE, startPartition, endPartition, context, metrics);
handle, startPartition, endPartition, 0, Integer.MAX_VALUE, context, metrics);
}
return SparkUtils.getReader(
sortShuffleManager(),
handle,
0,
Integer.MAX_VALUE,
startPartition,
endPartition,
0,
Integer.MAX_VALUE,
context,
metrics);
}
@ -310,45 +310,45 @@ public class SparkShuffleManager implements ShuffleManager {
ShuffleReadMetricsReporter metrics) {
if (handle instanceof CelebornShuffleHandle) {
return getCelebornShuffleReader(
handle, startMapIndex, endMapIndex, startPartition, endPartition, context, metrics);
handle, startPartition, endPartition, startMapIndex, endMapIndex, context, metrics);
}
return SparkUtils.getReader(
sortShuffleManager(),
handle,
startMapIndex,
endMapIndex,
startPartition,
endPartition,
startMapIndex,
endMapIndex,
context,
metrics);
}
public <K, C> ShuffleReader<K, C> getCelebornShuffleReader(
ShuffleHandle handle,
int startMapIndex,
int endMapIndex,
int startPartition,
int endPartition,
int startMapIndex,
int endMapIndex,
TaskContext context,
ShuffleReadMetricsReporter metrics) {
CelebornShuffleHandle<K, ?, C> h = (CelebornShuffleHandle<K, ?, C>) handle;
if (COLUMNAR_SHUFFLE_CLASSES_PRESENT && celebornConf.columnarShuffleEnabled()) {
return SparkUtils.createColumnarShuffleReader(
h,
startMapIndex,
endMapIndex,
startPartition,
endPartition,
startMapIndex,
endMapIndex,
context,
celebornConf,
metrics);
} else {
return new CelebornShuffleReader<>(
h,
startMapIndex,
endMapIndex,
startPartition,
endPartition,
startMapIndex,
endMapIndex,
context,
celebornConf,
metrics);

View File

@ -135,10 +135,10 @@ public class SparkUtils {
public static <K, C> ShuffleReader<K, C> getReader(
SortShuffleManager sortShuffleManager,
ShuffleHandle handle,
Integer startMapIndex,
Integer endMapIndex,
Integer startPartition,
Integer endPartition,
Integer startMapIndex,
Integer endMapIndex,
TaskContext context,
ShuffleReadMetricsReporter metrics) {
ShuffleReader<K, C> shuffleReader =
@ -200,10 +200,10 @@ public class SparkUtils {
public static <K, C> CelebornShuffleReader<K, C> createColumnarShuffleReader(
CelebornShuffleHandle<K, ?, C> handle,
int startMapIndex,
int endMapIndex,
int startPartition,
int endPartition,
int startMapIndex,
int endMapIndex,
TaskContext context,
CelebornConf conf,
ShuffleReadMetricsReporter metrics) {
@ -212,10 +212,10 @@ public class SparkUtils {
.invoke(
null,
handle,
startMapIndex,
endMapIndex,
startPartition,
endPartition,
startMapIndex,
endMapIndex,
context,
conf,
metrics);

View File

@ -30,10 +30,10 @@ import org.apache.celeborn.common.CelebornConf
class CelebornShuffleReader[K, C](
handle: CelebornShuffleHandle[K, _, C],
startMapIndex: Int = 0,
endMapIndex: Int = Int.MaxValue,
startPartition: Int,
endPartition: Int,
startMapIndex: Int = 0,
endMapIndex: Int = Int.MaxValue,
context: TaskContext,
conf: CelebornConf,
metrics: ShuffleReadMetricsReporter)