[CELEBORN-1621][FOLLOWUP] Support enabling worker tags via config

### What changes were proposed in this pull request?

- Adding support to enable/disable worker tags feature by a master config flag.
- Fixed BUG: After this change #2936, admins can also define the tagsExpr for users. In a case user is passing an empty tagsExpr current code will ignore the admin defined tagsExpr and allow job to use all workers.

### Why are the changes needed?

https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn

### Does this PR introduce _any_ user-facing change?

NA

### How was this patch tested?
Existing UTs

Closes #2953 from s0nskar/tags-enabled.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
Sanskar Modi 2024-11-28 11:22:35 +08:00 committed by mingji
parent 6a0f763e23
commit 259dfcd988
3 changed files with 11 additions and 1 deletions

View File

@ -1346,6 +1346,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED)
def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)
def tagsEnabled: Boolean = get(TAGS_ENABLED)
def tagsExpr: String = get(TAGS_EXPR)
def preferClientTagsExpr: Boolean = get(PREFER_CLIENT_TAGS_EXPR)
@ -5975,6 +5976,14 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
val TAGS_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.tags.enabled")
.categories("master")
.doc("Whether to enable tags for workers.")
.version("0.6.0")
.booleanConf
.createWithDefault(true)
val TAGS_EXPR: ConfigEntry[String] =
buildConf("celeborn.tags.tagsExpr")
.categories("master", "client")

View File

@ -91,6 +91,7 @@ license: |
| celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.tags.enabled | true | false | Whether to enable tags for workers. | 0.6.0 | |
| celeborn.tags.preferClientTagsExpr | false | true | When `true`, prefer the tags expression provided by the client over the tags expression provided by the master. | 0.6.0 | |
| celeborn.tags.tagsExpr | | true | Expression to filter workers by tags. The expression is a comma-separated list of tags. The expression is evaluated as a logical AND of all tags. For example, `prod,high-io` filters workers that have both the `prod` and `high-io` tags. | 0.6.0 | |
<!--end-include-->

View File

@ -857,7 +857,7 @@ private[celeborn] class Master(
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, requestSlots.shuffleId)
var availableWorkers = workersAvailable(requestSlots.excludedWorkerSet)
if (requestSlots.tagsExpr.nonEmpty) {
if (conf.tagsEnabled) {
availableWorkers = tagsManager.getTaggedWorkers(
requestSlots.userIdentifier,
requestSlots.tagsExpr,