diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index c8274ae80..c914e404b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1346,6 +1346,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED) def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW) + def tagsEnabled: Boolean = get(TAGS_ENABLED) def tagsExpr: String = get(TAGS_EXPR) def preferClientTagsExpr: Boolean = get(PREFER_CLIENT_TAGS_EXPR) @@ -5975,6 +5976,14 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(false) + val TAGS_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.tags.enabled") + .categories("master") + .doc("Whether to enable tags for workers.") + .version("0.6.0") + .booleanConf + .createWithDefault(true) + val TAGS_EXPR: ConfigEntry[String] = buildConf("celeborn.tags.tagsExpr") .categories("master", "client") diff --git a/docs/configuration/master.md b/docs/configuration/master.md index dfef34107..bdb9a791a 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -91,6 +91,7 @@ license: | | celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.endpoint.region | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.tags.enabled | true | false | Whether to enable tags for workers. | 0.6.0 | | | celeborn.tags.preferClientTagsExpr | false | true | When `true`, prefer the tags expression provided by the client over the tags expression provided by the master. | 0.6.0 | | | celeborn.tags.tagsExpr | | true | Expression to filter workers by tags. The expression is a comma-separated list of tags. The expression is evaluated as a logical AND of all tags. For example, `prod,high-io` filters workers that have both the `prod` and `high-io` tags. | 0.6.0 | | diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 2859ce3c5..e0328ac6d 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -857,7 +857,7 @@ private[celeborn] class Master( val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, requestSlots.shuffleId) var availableWorkers = workersAvailable(requestSlots.excludedWorkerSet) - if (requestSlots.tagsExpr.nonEmpty) { + if (conf.tagsEnabled) { availableWorkers = tagsManager.getTaggedWorkers( requestSlots.userIdentifier, requestSlots.tagsExpr,