[CELEBORN-654][SPARK] SortBasedShuffleWriter does not require mapStatusRecords in Spark 3

### What changes were proposed in this pull request?

`mapStatusRecords` is required in Spark 2 for constructing `MapStatus` when AQE is enabled, but not in Spark 3, so remove it to save memory and compute resources.

This PR also simplifies the `for loop` code.

### Why are the changes needed?

Remove unnecessary variables to save resources and clean up code.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass GA.

Closes #1564 from pan3793/CELEBORN-654.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2023-06-09 09:43:08 +08:00
parent 1ae8eb7145
commit 0636e3ca40
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
2 changed files with 12 additions and 19 deletions

View File

@ -323,10 +323,10 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
}
long pushStartTime = System.nanoTime();
if (pipelined) {
for (int i = 0; i < pushers.length; i++) {
pushers[i].waitPushFinish();
pushers[i].pushData();
pushers[i].close();
for (SortBasedPusher pusher : pushers) {
pusher.waitPushFinish();
pusher.pushData();
pusher.close();
}
} else {
currentPusher.pushData();
@ -344,13 +344,11 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
}
private void updateMapStatus() {
long recordsWritten = 0;
for (int i = 0; i < partitioner.numPartitions(); i++) {
for (int i = 0; i < tmpRecords.length; i++) {
mapStatusRecords[i] += tmpRecords[i];
recordsWritten += tmpRecords[i];
writeMetrics.incRecordsWritten(tmpRecords[i]);
tmpRecords[i] = 0;
}
writeMetrics.incRecordsWritten(recordsWritten);
}
@Override

View File

@ -83,7 +83,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
private final SerializationStream serOutputStream;
private final LongAdder[] mapStatusLengths;
private final long[] mapStatusRecords;
private final long[] tmpRecords;
/**
@ -124,7 +123,6 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
serOutputStream = serializer.serializeStream(serBuffer);
this.mapStatusLengths = new LongAdder[numPartitions];
this.mapStatusRecords = new long[numPartitions];
for (int i = 0; i < numPartitions; i++) {
this.mapStatusLengths[i] = new LongAdder();
}
@ -331,10 +329,10 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
}
long pushStartTime = System.nanoTime();
if (pipelined) {
for (int i = 0; i < pushers.length; i++) {
pushers[i].waitPushFinish();
pushers[i].pushData();
pushers[i].close();
for (SortBasedPusher pusher : pushers) {
pusher.waitPushFinish();
pusher.pushData();
pusher.close();
}
} else {
currentPusher.pushData();
@ -354,13 +352,10 @@ public class SortBasedShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
}
private void updateMapStatus() {
long recordsWritten = 0;
for (int i = 0; i < partitioner.numPartitions(); i++) {
mapStatusRecords[i] += tmpRecords[i];
recordsWritten += tmpRecords[i];
for (int i = 0; i < tmpRecords.length; i++) {
writeMetrics.incRecordsWritten(tmpRecords[i]);
tmpRecords[i] = 0;
}
writeMetrics.incRecordsWritten(recordsWritten);
}
@Override