From e314f85087eaa2c0761daefa290df11a0ff56f49 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 7 Jul 2023 15:33:09 +0800 Subject: [PATCH] [CELEBORN-614][FOLLOWUP] Fix flushOnMemoryPressure condition ### What changes were proposed in this pull request? Fix the refactor bug of CELEBORN-614 (https://github.com/apache/incubator-celeborn/pull/1517). ### Why are the changes needed? This is a bug fix, the condition `writer.getException != null` was inverted accidentally during CELEBORN-614 (https://github.com/apache/incubator-celeborn/pull/1517), which causes the trim became no-op. ### Does this PR introduce _any_ user-facing change? No. The bug was caused by an unreleased commit. ### How was this patch tested? Set Worker off-heap memory to 2G, and run 1T tera sort. Before: the trim does not trigger disk buffer flush, causing the worker can not to recover from the pause pushdata state, then Job failed. After: the trim correctly triggers disk buffer flush, releases the worker memory, and the Job succeeded. image Closes #1689 from pan3793/CELEBORN-614-followup. Authored-by: Cheng Pan Signed-off-by: mingji --- .../service/deploy/worker/storage/StorageManager.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 37fe0581d..97095ede1 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -629,7 +629,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs override def accept(t: File, writers: ConcurrentHashMap[String, FileWriter]): Unit = { writers.forEach(new BiConsumer[String, FileWriter] { override def accept(file: String, writer: FileWriter): Unit = { - if (writer.getException != null) { + if (writer.getException == null) { try { writer.flushOnMemoryPressure() } catch { @@ -638,6 +638,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs s"FileWrite of $writer faces unexpected exception when flush on memory pressure.", t) } + } else { + logWarning(s"Skip flushOnMemoryPressure because ${writer.flusher} " + + s"has error: ${writer.getException.getMessage}") } } })