[CELEBORN-1916][FOLLOWUP] Improve Aliyun OSS support

### What changes were proposed in this pull request?

Improve Aliyun OSS support including `SlotsAllocator#offerSlotsLoadAware`, `Worker#heartbeatToMaster` and `PartitionDataWriter#getStorageInfo`.

### Why are the changes needed?

There are many methods where OSS support is lacking in `SlotsAllocator#offerSlotsLoadAware`, `Worker#heartbeatToMaster` and `PartitionDataWriter#getStorageInfo`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3268 from SteNicholas/CELEBORN-1916.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
SteNicholas 2025-05-21 11:44:50 +08:00 committed by mingji
parent 0b5a09a9f7
commit 46d9d63e1f
9 changed files with 43 additions and 52 deletions

View File

@ -202,14 +202,14 @@ public class StorageInfo implements Serializable {
return availableStorageTypes == HDFS_MASK;
}
public boolean HDFSOnly() {
return StorageInfo.HDFSOnly(availableStorageTypes);
}
public static boolean S3Only(int availableStorageTypes) {
return availableStorageTypes == S3_MASK;
}
public static boolean OSSOnly(int availableStorageTypes) {
return availableStorageTypes == OSS_MASK;
}
public static boolean OSSAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & OSS_MASK) > 0;

View File

@ -931,7 +931,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
get(CLIENT_SHUFFLE_DYNAMIC_RESOURCE_ENABLED)
def clientShuffleDynamicResourceFactor: Double = get(CLIENT_SHUFFLE_DYNAMIC_RESOURCE_FACTOR)
def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT)
def dfsExpireDirsTimeoutMS: Long = get(DFS_EXPIRE_DIRS_TIMEOUT)
def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
def applicationUnregisterEnabled: Boolean = get(APPLICATION_UNREGISTER_ENABLED)
@ -2384,19 +2383,11 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("300s")
val HDFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.hdfs.expireDirs.timeout")
.categories("master")
.version("0.3.0")
.doc("The timeout for a expire dirs to be deleted on HDFS.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1h")
val DFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.dfs.expireDirs.timeout")
.categories("master")
.version("0.6.0")
.doc("The timeout for a expire dirs to be deleted on S3 or HDFS.")
.doc("The timeout for an expired dirs to be deleted on dfs like HDFS, S3, OSS.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1h")

View File

@ -248,6 +248,9 @@ class CelebornConfSuite extends CelebornFunSuite {
conf.set("celeborn.storage.availableTypes", "S3")
assert(conf.availableStorageTypes == StorageInfo.S3_MASK)
conf.set("celeborn.storage.availableTypes", "OSS")
assert(conf.availableStorageTypes == StorageInfo.OSS_MASK)
}
test("Test role rpcDispatcherNumThreads") {

View File

@ -37,14 +37,13 @@ license: |
| celeborn.logConf.enabled | false | false | When `true`, log the CelebornConf for debugging purposes. | 0.5.0 | |
| celeborn.master.allowWorkerHostPattern | &lt;undefined&gt; | false | Pattern of worker host that allowed to register with the master. If not set, all workers are allowed to register. | 0.6.0 | |
| celeborn.master.denyWorkerHostPattern | &lt;undefined&gt; | false | Pattern of worker host that denied to register with the master. If not set, no workers are denied to register. | 0.6.0 | |
| celeborn.master.dfs.expireDirs.timeout | 1h | false | The timeout for a expire dirs to be deleted on S3 or HDFS. | 0.6.0 | |
| celeborn.master.dfs.expireDirs.timeout | 1h | false | The timeout for an expired dirs to be deleted on dfs like HDFS, S3, OSS. | 0.6.0 | |
| celeborn.master.estimatedPartitionSize.initialSize | 64mb | false | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 | celeborn.shuffle.initialEstimatedPartitionSize |
| celeborn.master.estimatedPartitionSize.maxSize | &lt;undefined&gt; | false | Max partition size for estimation. Default value should be celeborn.worker.shuffle.partitionSplit.max * 2. | 0.4.1 | |
| celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate |
| celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | false | Initial delay time before start updating partition size for estimation. | 0.3.0 | celeborn.shuffle.estimatedPartitionSize.update.initialDelay |
| celeborn.master.estimatedPartitionSize.update.interval | 10min | false | Interval of updating partition size for estimation. | 0.3.0 | celeborn.shuffle.estimatedPartitionSize.update.interval |
| celeborn.master.excludeWorker.unhealthyDiskRatioThreshold | 1.0 | false | Max ratio of unhealthy disks for excluding worker, when unhealthy disk is larger than max unhealthy count, master will exclude worker. If this value is set to 1, master will exclude worker of which disks are all unhealthy. | 0.6.0 | |
| celeborn.master.hdfs.expireDirs.timeout | 1h | false | The timeout for a expire dirs to be deleted on HDFS. | 0.3.0 | |
| celeborn.master.heartbeat.application.timeout | 300s | false | Application heartbeat timeout. | 0.3.0 | celeborn.application.heartbeat.timeout |
| celeborn.master.heartbeat.worker.timeout | 120s | false | Worker heartbeat timeout. | 0.3.0 | celeborn.worker.heartbeat.timeout |
| celeborn.master.host | &lt;localhost&gt; | false | Hostname for master to bind. | 0.2.0 | |

View File

@ -31,6 +31,8 @@ license: |
- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.hdfsFileCount` to `celeborn.quota.user.hdfsFileCount`. Please use `celeborn.quota.user.hdfsFileCount` if you want to set user level quota.
- Since 0.6.0, Celeborn modified `celeborn.master.hdfs.expireDirs.timeout` to `celeborn.master.dfs.expireDirs.timeout`. Please use `cceleborn.master.dfs.expireDirs.timeout` if you want to set timeout for an expired dirs to be deleted.
- Since 0.6.0, Celeborn changed the default value of `celeborn.master.slot.assign.extraSlots` from `2` to `100`, which means Celeborn will involve more workers in offering slots.
- Since 0.6.0, Celeborn deprecate `celeborn.worker.congestionControl.low.watermark`. Please use `celeborn.worker.congestionControl.diskBuffer.low.watermark` instead.

View File

@ -126,11 +126,9 @@ public class SlotsAllocator {
if (workers.size() < 2 && shouldReplicate) {
return new HashMap<>();
}
if (StorageInfo.HDFSOnly(availableStorageTypes)) {
return offerSlotsRoundRobin(
workers, partitionIds, shouldReplicate, shouldRackAware, availableStorageTypes);
}
if (StorageInfo.S3Only(availableStorageTypes)) {
if (StorageInfo.HDFSOnly(availableStorageTypes)
|| StorageInfo.S3Only(availableStorageTypes)
|| StorageInfo.OSSOnly(availableStorageTypes)) {
return offerSlotsRoundRobin(
workers, partitionIds, shouldReplicate, shouldRackAware, availableStorageTypes);
}

View File

@ -155,7 +155,7 @@ public class PartitionDataWriter implements DeviceObserver {
} else if (diskFileInfo.isS3()) {
storageInfo = new StorageInfo(StorageInfo.Type.S3, true, diskFileInfo.getFilePath());
} else if (diskFileInfo.isOSS()) {
return new StorageInfo(StorageInfo.Type.OSS, true, diskFileInfo.getFilePath());
storageInfo = new StorageInfo(StorageInfo.Type.OSS, true, diskFileInfo.getFilePath());
} else {
storageInfo = new StorageInfo(StorageInfo.Type.HDFS, true, diskFileInfo.getFilePath());
}

View File

@ -490,7 +490,7 @@ private[celeborn] class Worker(
val diskInfos =
workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map { disk =>
disk.mountPoint -> disk
}.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo ++ storageManager.s3DiskInfo
}.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo ++ storageManager.s3DiskInfo ++ storageManager.ossDiskInfo
workerStatusManager.checkIfNeedTransitionStatus()
val response = masterClient.askSync[HeartbeatFromWorkerResponse](
HeartbeatFromWorker(

View File

@ -238,9 +238,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
val activeTypes = conf.availableStorageTypes
lazy val localOrDfsStorageAvailable: Boolean = {
StorageInfo.OSSAvailable(activeTypes) || StorageInfo.HDFSAvailable(
activeTypes) || StorageInfo.localDiskAvailable(
activeTypes) || hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty || ossDir.nonEmpty
StorageInfo.OSSAvailable(activeTypes) || StorageInfo.S3Available(activeTypes) ||
StorageInfo.HDFSAvailable(activeTypes) || StorageInfo.localDiskAvailable(activeTypes) ||
hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty || ossDir.nonEmpty
}
override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = this.synchronized {
@ -842,33 +842,31 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
override def accept(
t: File,
writers: ConcurrentHashMap[String, PartitionDataWriter]): Unit = {
writers.forEach(new BiConsumer[String, PartitionDataWriter] {
override def accept(file: String, writer: PartitionDataWriter): Unit = {
if (writer.getException == null) {
try {
writer.flushOnMemoryPressure()
} catch {
case t: Throwable =>
logError(
s"FileWrite of $writer faces unexpected exception when flush on memory pressure.",
t)
}
} else {
logWarning(s"Skip flushOnMemoryPressure because ${writer.getFlusher} " +
s"has error: ${writer.getException.getMessage}")
}
flushOnMemoryPressure(writers)
}
})
flushOnMemoryPressure(hdfsWriters)
flushOnMemoryPressure(s3Writers)
flushOnMemoryPressure(ossWriters)
}
private def flushOnMemoryPressure(writers: ConcurrentHashMap[String, PartitionDataWriter])
: Unit = {
writers.forEach(new BiConsumer[String, PartitionDataWriter] {
override def accept(file: String, writer: PartitionDataWriter): Unit = {
if (writer.getException == null) {
try {
writer.flushOnMemoryPressure()
} catch {
case t: Throwable =>
logError(
s"FileWrite of $writer faces unexpected exception when flush on memory pressure.",
t)
}
})
}
})
hdfsWriters.forEach(new BiConsumer[String, PartitionDataWriter] {
override def accept(t: String, u: PartitionDataWriter): Unit = {
u.flushOnMemoryPressure()
}
})
s3Writers.forEach(new BiConsumer[String, PartitionDataWriter] {
override def accept(t: String, u: PartitionDataWriter): Unit = {
u.flushOnMemoryPressure()
} else {
logWarning(s"Skip flushOnMemoryPressure because ${writer.getFlusher} " +
s"has error: ${writer.getException.getMessage}")
}
}
})
}