diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 37fe0581d..97095ede1 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -629,7 +629,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs override def accept(t: File, writers: ConcurrentHashMap[String, FileWriter]): Unit = { writers.forEach(new BiConsumer[String, FileWriter] { override def accept(file: String, writer: FileWriter): Unit = { - if (writer.getException != null) { + if (writer.getException == null) { try { writer.flushOnMemoryPressure() } catch { @@ -638,6 +638,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs s"FileWrite of $writer faces unexpected exception when flush on memory pressure.", t) } + } else { + logWarning(s"Skip flushOnMemoryPressure because ${writer.flusher} " + + s"has error: ${writer.getException.getMessage}") } } })