From 54732c7b3822a11e2ddcf777b3bbadcccba0da4c Mon Sep 17 00:00:00 2001 From: Nicolas Fraison Date: Thu, 8 May 2025 16:52:44 +0800 Subject: [PATCH] Update celeborn conf to add S3 in default and doc for policy ### What changes were proposed in this pull request? Add S3 type in evict and create policies Add S3 type in list of default evict and create policy ### Why are the changes needed? To align with other types ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Closes #3218 from ashangit/nfraison/doc_s3. Authored-by: Nicolas Fraison Signed-off-by: mingji --- README.md | 5 +++++ .../org/apache/celeborn/common/CelebornConf.scala | 12 ++++++------ .../apache/celeborn/common/CelebornConfSuite.scala | 4 ++-- docs/configuration/worker.md | 4 ++-- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 1eb87b279..fc5d834e1 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,11 @@ To compile for Spark 3.5 with Java21, please use the following command > **_NOTE:_** Celeborn supports automatic builds on linux aarch64 platform via `aarch64` profile. `aarch64` profile requires glibc version 3.4.21. There is potential problematic frame `C [libc.so.6+0x8412a]` for other glibc version like 2.x etc. +To build Celeborn with AWS S3 support MPU, please use the following command +```shell +./build/make-distribution.sh --sbt-enabled -Pspark-3.4 -Pjdk-8 -Paws +``` + To build Celeborn with Aliyun OSS support MPU, please use the following command ```shell ./build/make-distribution.sh --sbt-enabled -Pspark-3.4 -Pjdk-8 -Paliyun diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 1fcb5fe35..f2cc17613 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1247,7 +1247,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def workerStoragePolicyCreateFilePolicy: Option[List[String]] = get(WORKER_STORAGE_CREATE_FILE_POLICY).map { policy => policy.split(",").map(_.trim).toList - }.orElse(Some(List("MEMORY", "SSD", "HDD", "HDFS", "OSS"))) + }.orElse(Some(List("MEMORY", "SSD", "HDD", "HDFS", "S3", "OSS"))) def workerStoragePolicyEvictFilePolicy: Option[Map[String, List[String]]] = get(WORKER_STORAGE_EVICT_POLICY).map { @@ -1256,7 +1256,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se val groupArr = group.split(",") Map(groupArr.head -> groupArr.slice(1, groupArr.length).toList) }).reduce(_ ++ _) - }.orElse(Some(Map("MEMORY" -> List("SSD", "HDD", "HDFS", "OSS")))) + }.orElse(Some(Map("MEMORY" -> List("SSD", "HDD", "HDFS", "S3", "OSS")))) // ////////////////////////////////////////////////////// // Decommission // @@ -3286,20 +3286,20 @@ object CelebornConf extends Logging { buildConf("celeborn.worker.storage.storagePolicy.createFilePolicy") .categories("worker") .doc("This defined the order for creating files across available storages." + - " Available storages options are: MEMORY,SSD,HDD,HDFS,OSS") + " Available storages options are: MEMORY,SSD,HDD,HDFS,S3,OSS") .version("0.5.1") .stringConf .checkValue( _.split(",").map(str => StorageInfo.typeNames.contains(str.trim.toUpperCase)).forall(p => p), - "Will use default create file order. Default order: MEMORY,SSD,HDD,HDFS,OSS") + "Will use default create file order. Default order: MEMORY,SSD,HDD,HDFS,S3,OSS") .createOptional val WORKER_STORAGE_EVICT_POLICY: OptionalConfigEntry[String] = buildConf("celeborn.worker.storage.storagePolicy.evictPolicy") .categories("worker") .doc("This define the order of evict files if the storages are available." + - " Available storages: MEMORY,SSD,HDD,HDFS. " + + " Available storages: MEMORY,SSD,HDD,HDFS,S3,OSS. " + "Definition: StorageTypes|StorageTypes|StorageTypes. " + "Example: MEMORY,SSD|SSD,HDFS." + " The example means that a MEMORY shuffle file can be evicted to SSD " + @@ -3309,7 +3309,7 @@ object CelebornConf extends Logging { .checkValue( _.replace("|", ",").split(",").map(str => StorageInfo.typeNames.contains(str.trim.toUpperCase)).forall(p => p), - "Will use default evict order. Default order: MEMORY,SSD,HDD,HDFS,OSS") + "Will use default evict order. Default order: MEMORY,SSD,HDD,HDFS,S3,OSS") .createOptional val WORKER_HTTP_HOST: ConfigEntry[String] = diff --git a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala index ef0922f8c..a4e721ceb 100644 --- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala @@ -427,7 +427,7 @@ class CelebornConfSuite extends CelebornFunSuite { conf.unset("celeborn.worker.storage.storagePolicy.createFilePolicy") val createFilePolicy3 = conf.workerStoragePolicyCreateFilePolicy - assert(List("MEMORY", "SSD", "HDD", "HDFS", "OSS") == createFilePolicy3.get) + assert(List("MEMORY", "SSD", "HDD", "HDFS", "S3", "OSS") == createFilePolicy3.get) try { conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "ABC") @@ -450,7 +450,7 @@ class CelebornConfSuite extends CelebornFunSuite { conf.unset("celeborn.worker.storage.storagePolicy.evictPolicy") val evictPolicy3 = conf.workerStoragePolicyEvictFilePolicy - assert(Map("MEMORY" -> List("SSD", "HDD", "HDFS", "OSS")) == evictPolicy3.get) + assert(Map("MEMORY" -> List("SSD", "HDD", "HDFS", "S3", "OSS")) == evictPolicy3.get) try { conf.set("celeborn.worker.storage.storagePolicy.evictPolicy", "ABC") diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 854b1ae3f..99739d7d9 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -188,8 +188,8 @@ license: | | celeborn.worker.storage.disk.reserve.ratio | <undefined> | false | Celeborn worker reserved ratio for each disk. The minimum usable size for each disk is the max space between the reserved space and the space calculate via reserved ratio. | 0.3.2 | | | celeborn.worker.storage.disk.reserve.size | 5G | false | Celeborn worker reserved space for each disk. | 0.3.0 | celeborn.worker.disk.reserve.size | | celeborn.worker.storage.expireDirs.timeout | 1h | false | The timeout for a expire dirs to be deleted on disk. | 0.3.2 | | -| celeborn.worker.storage.storagePolicy.createFilePolicy | <undefined> | false | This defined the order for creating files across available storages. Available storages options are: MEMORY,SSD,HDD,HDFS,OSS | 0.5.1 | | -| celeborn.worker.storage.storagePolicy.evictPolicy | <undefined> | false | This define the order of evict files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS. Definition: StorageTypes|StorageTypes|StorageTypes. Example: MEMORY,SSD|SSD,HDFS. The example means that a MEMORY shuffle file can be evicted to SSD and a SSD shuffle file can be evicted to HDFS. | 0.5.1 | | +| celeborn.worker.storage.storagePolicy.createFilePolicy | <undefined> | false | This defined the order for creating files across available storages. Available storages options are: MEMORY,SSD,HDD,HDFS,S3,OSS | 0.5.1 | | +| celeborn.worker.storage.storagePolicy.evictPolicy | <undefined> | false | This define the order of evict files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS,S3,OSS. Definition: StorageTypes|StorageTypes|StorageTypes. Example: MEMORY,SSD|SSD,HDFS. The example means that a MEMORY shuffle file can be evicted to SSD and a SSD shuffle file can be evicted to HDFS. | 0.5.1 | | | celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | false | Worker's working dir path name. | 0.3.0 | celeborn.worker.workingDir | | celeborn.worker.writer.close.timeout | 120s | false | Timeout for a file writer to close | 0.2.0 | | | celeborn.worker.writer.create.maxAttempts | 3 | false | Retry count for a file writer to create if its creation was failed. | 0.2.0 | |