[CELEBORN-614][FOLLOWUP] Fix flushOnMemoryPressure condition

### What changes were proposed in this pull request?

Fix the refactor bug of CELEBORN-614 (https://github.com/apache/incubator-celeborn/pull/1517).

### Why are the changes needed?

This is a bug fix, the condition `writer.getException != null` was inverted accidentally during CELEBORN-614 (https://github.com/apache/incubator-celeborn/pull/1517), which causes the trim became no-op.

### Does this PR introduce _any_ user-facing change?

No. The bug was caused by an unreleased commit.

### How was this patch tested?

Set Worker off-heap memory to 2G, and run 1T tera sort.

Before: the trim does not trigger disk buffer flush, causing the worker can not to recover from the pause pushdata state, then Job failed.

After: the trim correctly triggers disk buffer flush, releases the worker memory, and the Job succeeded.

<img width="1653" alt="image" src="https://github.com/apache/incubator-celeborn/assets/26535726/9ef62c78-e6a9-497f-9dac-d3f712e830cc">

Closes #1689 from pan3793/CELEBORN-614-followup.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
Cheng Pan 2023-07-07 15:33:09 +08:00 committed by mingji
parent a556b02bc1
commit e314f85087
No known key found for this signature in database
GPG Key ID: 6392F71F37356FA0

View File

@ -629,7 +629,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
override def accept(t: File, writers: ConcurrentHashMap[String, FileWriter]): Unit = {
writers.forEach(new BiConsumer[String, FileWriter] {
override def accept(file: String, writer: FileWriter): Unit = {
if (writer.getException != null) {
if (writer.getException == null) {
try {
writer.flushOnMemoryPressure()
} catch {
@ -638,6 +638,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
s"FileWrite of $writer faces unexpected exception when flush on memory pressure.",
t)
}
} else {
logWarning(s"Skip flushOnMemoryPressure because ${writer.flusher} " +
s"has error: ${writer.getException.getMessage}")
}
}
})