From 46d9d63e1f42b5280ee5aea814bdd18b2e83c780 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Wed, 21 May 2025 11:44:50 +0800 Subject: [PATCH] [CELEBORN-1916][FOLLOWUP] Improve Aliyun OSS support ### What changes were proposed in this pull request? Improve Aliyun OSS support including `SlotsAllocator#offerSlotsLoadAware`, `Worker#heartbeatToMaster` and `PartitionDataWriter#getStorageInfo`. ### Why are the changes needed? There are many methods where OSS support is lacking in `SlotsAllocator#offerSlotsLoadAware`, `Worker#heartbeatToMaster` and `PartitionDataWriter#getStorageInfo`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI. Closes #3268 from SteNicholas/CELEBORN-1916. Authored-by: SteNicholas Signed-off-by: mingji --- .../celeborn/common/protocol/StorageInfo.java | 8 +-- .../apache/celeborn/common/CelebornConf.scala | 11 +--- .../celeborn/common/CelebornConfSuite.scala | 3 + docs/configuration/master.md | 3 +- docs/migration.md | 2 + .../service/deploy/master/SlotsAllocator.java | 8 +-- .../worker/storage/PartitionDataWriter.java | 2 +- .../service/deploy/worker/Worker.scala | 2 +- .../worker/storage/StorageManager.scala | 56 +++++++++---------- 9 files changed, 43 insertions(+), 52 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java index 8509d5717..b8d9428c3 100644 --- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java @@ -202,14 +202,14 @@ public class StorageInfo implements Serializable { return availableStorageTypes == HDFS_MASK; } - public boolean HDFSOnly() { - return StorageInfo.HDFSOnly(availableStorageTypes); - } - public static boolean S3Only(int availableStorageTypes) { return availableStorageTypes == S3_MASK; } + public static boolean OSSOnly(int availableStorageTypes) { + return availableStorageTypes == OSS_MASK; + } + public static boolean OSSAvailable(int availableStorageTypes) { return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK || (availableStorageTypes & OSS_MASK) > 0; diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index d3a9977ef..c994c2802 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -931,7 +931,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se get(CLIENT_SHUFFLE_DYNAMIC_RESOURCE_ENABLED) def clientShuffleDynamicResourceFactor: Double = get(CLIENT_SHUFFLE_DYNAMIC_RESOURCE_FACTOR) def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT) - def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT) def dfsExpireDirsTimeoutMS: Long = get(DFS_EXPIRE_DIRS_TIMEOUT) def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL) def applicationUnregisterEnabled: Boolean = get(APPLICATION_UNREGISTER_ENABLED) @@ -2384,19 +2383,11 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("300s") - val HDFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] = - buildConf("celeborn.master.hdfs.expireDirs.timeout") - .categories("master") - .version("0.3.0") - .doc("The timeout for a expire dirs to be deleted on HDFS.") - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefaultString("1h") - val DFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] = buildConf("celeborn.master.dfs.expireDirs.timeout") .categories("master") .version("0.6.0") - .doc("The timeout for a expire dirs to be deleted on S3 or HDFS.") + .doc("The timeout for an expired dirs to be deleted on dfs like HDFS, S3, OSS.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1h") diff --git a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala index a4e721ceb..2fedcdb32 100644 --- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala @@ -248,6 +248,9 @@ class CelebornConfSuite extends CelebornFunSuite { conf.set("celeborn.storage.availableTypes", "S3") assert(conf.availableStorageTypes == StorageInfo.S3_MASK) + + conf.set("celeborn.storage.availableTypes", "OSS") + assert(conf.availableStorageTypes == StorageInfo.OSS_MASK) } test("Test role rpcDispatcherNumThreads") { diff --git a/docs/configuration/master.md b/docs/configuration/master.md index f17cdcc00..add7c136e 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -37,14 +37,13 @@ license: | | celeborn.logConf.enabled | false | false | When `true`, log the CelebornConf for debugging purposes. | 0.5.0 | | | celeborn.master.allowWorkerHostPattern | <undefined> | false | Pattern of worker host that allowed to register with the master. If not set, all workers are allowed to register. | 0.6.0 | | | celeborn.master.denyWorkerHostPattern | <undefined> | false | Pattern of worker host that denied to register with the master. If not set, no workers are denied to register. | 0.6.0 | | -| celeborn.master.dfs.expireDirs.timeout | 1h | false | The timeout for a expire dirs to be deleted on S3 or HDFS. | 0.6.0 | | +| celeborn.master.dfs.expireDirs.timeout | 1h | false | The timeout for an expired dirs to be deleted on dfs like HDFS, S3, OSS. | 0.6.0 | | | celeborn.master.estimatedPartitionSize.initialSize | 64mb | false | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 | celeborn.shuffle.initialEstimatedPartitionSize | | celeborn.master.estimatedPartitionSize.maxSize | <undefined> | false | Max partition size for estimation. Default value should be celeborn.worker.shuffle.partitionSplit.max * 2. | 0.4.1 | | | celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate | | celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | false | Initial delay time before start updating partition size for estimation. | 0.3.0 | celeborn.shuffle.estimatedPartitionSize.update.initialDelay | | celeborn.master.estimatedPartitionSize.update.interval | 10min | false | Interval of updating partition size for estimation. | 0.3.0 | celeborn.shuffle.estimatedPartitionSize.update.interval | | celeborn.master.excludeWorker.unhealthyDiskRatioThreshold | 1.0 | false | Max ratio of unhealthy disks for excluding worker, when unhealthy disk is larger than max unhealthy count, master will exclude worker. If this value is set to 1, master will exclude worker of which disks are all unhealthy. | 0.6.0 | | -| celeborn.master.hdfs.expireDirs.timeout | 1h | false | The timeout for a expire dirs to be deleted on HDFS. | 0.3.0 | | | celeborn.master.heartbeat.application.timeout | 300s | false | Application heartbeat timeout. | 0.3.0 | celeborn.application.heartbeat.timeout | | celeborn.master.heartbeat.worker.timeout | 120s | false | Worker heartbeat timeout. | 0.3.0 | celeborn.worker.heartbeat.timeout | | celeborn.master.host | <localhost> | false | Hostname for master to bind. | 0.2.0 | | diff --git a/docs/migration.md b/docs/migration.md index 616974d69..9512c6edb 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -31,6 +31,8 @@ license: | - Since 0.6.0, Celeborn modified `celeborn.quota.tenant.hdfsFileCount` to `celeborn.quota.user.hdfsFileCount`. Please use `celeborn.quota.user.hdfsFileCount` if you want to set user level quota. +- Since 0.6.0, Celeborn modified `celeborn.master.hdfs.expireDirs.timeout` to `celeborn.master.dfs.expireDirs.timeout`. Please use `cceleborn.master.dfs.expireDirs.timeout` if you want to set timeout for an expired dirs to be deleted. + - Since 0.6.0, Celeborn changed the default value of `celeborn.master.slot.assign.extraSlots` from `2` to `100`, which means Celeborn will involve more workers in offering slots. - Since 0.6.0, Celeborn deprecate `celeborn.worker.congestionControl.low.watermark`. Please use `celeborn.worker.congestionControl.diskBuffer.low.watermark` instead. diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java index 62582018f..608c3e7bd 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java @@ -126,11 +126,9 @@ public class SlotsAllocator { if (workers.size() < 2 && shouldReplicate) { return new HashMap<>(); } - if (StorageInfo.HDFSOnly(availableStorageTypes)) { - return offerSlotsRoundRobin( - workers, partitionIds, shouldReplicate, shouldRackAware, availableStorageTypes); - } - if (StorageInfo.S3Only(availableStorageTypes)) { + if (StorageInfo.HDFSOnly(availableStorageTypes) + || StorageInfo.S3Only(availableStorageTypes) + || StorageInfo.OSSOnly(availableStorageTypes)) { return offerSlotsRoundRobin( workers, partitionIds, shouldReplicate, shouldRackAware, availableStorageTypes); } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java index 3cacd4866..932890237 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java @@ -155,7 +155,7 @@ public class PartitionDataWriter implements DeviceObserver { } else if (diskFileInfo.isS3()) { storageInfo = new StorageInfo(StorageInfo.Type.S3, true, diskFileInfo.getFilePath()); } else if (diskFileInfo.isOSS()) { - return new StorageInfo(StorageInfo.Type.OSS, true, diskFileInfo.getFilePath()); + storageInfo = new StorageInfo(StorageInfo.Type.OSS, true, diskFileInfo.getFilePath()); } else { storageInfo = new StorageInfo(StorageInfo.Type.HDFS, true, diskFileInfo.getFilePath()); } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 502740cd7..0a1e14ff4 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -490,7 +490,7 @@ private[celeborn] class Worker( val diskInfos = workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map { disk => disk.mountPoint -> disk - }.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo ++ storageManager.s3DiskInfo + }.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo ++ storageManager.s3DiskInfo ++ storageManager.ossDiskInfo workerStatusManager.checkIfNeedTransitionStatus() val response = masterClient.askSync[HeartbeatFromWorkerResponse]( HeartbeatFromWorker( diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 1d369cf27..345f109ce 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -238,9 +238,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val activeTypes = conf.availableStorageTypes lazy val localOrDfsStorageAvailable: Boolean = { - StorageInfo.OSSAvailable(activeTypes) || StorageInfo.HDFSAvailable( - activeTypes) || StorageInfo.localDiskAvailable( - activeTypes) || hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty || ossDir.nonEmpty + StorageInfo.OSSAvailable(activeTypes) || StorageInfo.S3Available(activeTypes) || + StorageInfo.HDFSAvailable(activeTypes) || StorageInfo.localDiskAvailable(activeTypes) || + hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty || ossDir.nonEmpty } override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = this.synchronized { @@ -842,33 +842,31 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs override def accept( t: File, writers: ConcurrentHashMap[String, PartitionDataWriter]): Unit = { - writers.forEach(new BiConsumer[String, PartitionDataWriter] { - override def accept(file: String, writer: PartitionDataWriter): Unit = { - if (writer.getException == null) { - try { - writer.flushOnMemoryPressure() - } catch { - case t: Throwable => - logError( - s"FileWrite of $writer faces unexpected exception when flush on memory pressure.", - t) - } - } else { - logWarning(s"Skip flushOnMemoryPressure because ${writer.getFlusher} " + - s"has error: ${writer.getException.getMessage}") - } + flushOnMemoryPressure(writers) + } + }) + flushOnMemoryPressure(hdfsWriters) + flushOnMemoryPressure(s3Writers) + flushOnMemoryPressure(ossWriters) + } + + private def flushOnMemoryPressure(writers: ConcurrentHashMap[String, PartitionDataWriter]) + : Unit = { + writers.forEach(new BiConsumer[String, PartitionDataWriter] { + override def accept(file: String, writer: PartitionDataWriter): Unit = { + if (writer.getException == null) { + try { + writer.flushOnMemoryPressure() + } catch { + case t: Throwable => + logError( + s"FileWrite of $writer faces unexpected exception when flush on memory pressure.", + t) } - }) - } - }) - hdfsWriters.forEach(new BiConsumer[String, PartitionDataWriter] { - override def accept(t: String, u: PartitionDataWriter): Unit = { - u.flushOnMemoryPressure() - } - }) - s3Writers.forEach(new BiConsumer[String, PartitionDataWriter] { - override def accept(t: String, u: PartitionDataWriter): Unit = { - u.flushOnMemoryPressure() + } else { + logWarning(s"Skip flushOnMemoryPressure because ${writer.getFlusher} " + + s"has error: ${writer.getException.getMessage}") + } } }) }