diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java index c48bbe1fb..948bcbe86 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java @@ -19,6 +19,7 @@ package org.apache.celeborn.server.common.service.config; import java.util.HashMap; import java.util.Map; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +42,7 @@ public abstract class DynamicConfig { private static final Logger LOG = LoggerFactory.getLogger(DynamicConfig.class); protected Map configs = new HashMap<>(); protected volatile Quota quota = null; + protected volatile Map> tags = null; public abstract DynamicConfig getParentLevelConfig(); @@ -131,6 +133,21 @@ public abstract class DynamicConfig { return configs; } + public Map> getTags() { + if (tags == null) { + synchronized (DynamicConfig.class) { + if (tags == null) { + tags = currentTags(); + } + } + } + return tags; + } + + protected Map> currentTags() { + return null; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("DynamicConfig{"); diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java index 125e81d73..2b7b4bcca 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java @@ -20,10 +20,7 @@ package org.apache.celeborn.server.common.service.config; import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; @@ -38,6 +35,8 @@ public class FsConfigServiceImpl extends BaseConfigServiceImpl implements Config private static final String CONF_LEVEL = "level"; private static final String CONF_CONFIG = "config"; + private static final String TAGS_CONFIG = "tags"; + public FsConfigServiceImpl(CelebornConf celebornConf) throws IOException { super(celebornConf); } @@ -51,7 +50,10 @@ public class FsConfigServiceImpl extends BaseConfigServiceImpl implements Config for (Map configMap : configs) { if (ConfigLevel.SYSTEM.name().equals(configMap.get(CONF_LEVEL))) { if (configMap.containsKey(CONF_CONFIG)) { - systemConfigAtomicReference.set(new SystemConfig(celebornConf, getConfigs(configMap))); + systemConfigAtomicReference.get().setConfigs(getConfigs(configMap)); + } + if (configMap.containsKey(TAGS_CONFIG)) { + systemConfigAtomicReference.get().setTags(getTags(configMap)); } } else { if (configMap.containsKey(CONF_TENANT_ID)) { @@ -88,6 +90,16 @@ public class FsConfigServiceImpl extends BaseConfigServiceImpl implements Config .collect(Collectors.toMap(Map.Entry::getKey, config -> config.getValue().toString())); } + private Map> getTags(Map configMap) { + Map tagsConfig = (Map) configMap.get(TAGS_CONFIG); + if (tagsConfig == null) return Collections.emptyMap(); + return tagsConfig.entrySet().stream() + .filter(tags -> tags.getValue() != null) + .collect( + Collectors.toMap( + Map.Entry::getKey, tags -> new HashSet<>((ArrayList) tags.getValue()))); + } + private File getConfigFile(Map env) throws IOException { File configFile = celebornConf.dynamicConfigStoreFsPath().isEmpty() diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java index c9f627e3e..be420275e 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java @@ -20,6 +20,7 @@ package org.apache.celeborn.server.common.service.config; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.internal.config.ConfigEntry; @@ -63,4 +64,17 @@ public class SystemConfig extends DynamicConfig { return formatValue; } } + + public void setConfigs(Map configs) { + this.configs = configs; + } + + public void setTags(Map> tags) { + this.tags = tags; + } + + @Override + protected Map> currentTags() { + return this.tags; + } } diff --git a/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java b/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java index 340124f08..95d0fc6c8 100644 --- a/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java +++ b/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java @@ -23,6 +23,9 @@ import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Instant; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; @@ -307,4 +310,68 @@ public class ConfigServiceSuiteJ { Assert.assertEquals(gmtTime, clusterInfo.getGmtCreate()); Assert.assertEquals(gmtTime, clusterInfo.getGmtModify()); } + + @Test + public void testTags() throws IOException { + CelebornConf celebornConf = new CelebornConf(); + String file = getClass().getResource("/dynamicConfig_tags.yaml").getFile(); + celebornConf.set(CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH(), file); + celebornConf.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_INTERVAL(), 5L); + configService = new FsConfigServiceImpl(celebornConf); + + verifyTags(configService); + + // change -> refresh config + file = getClass().getResource("/dynamicConfig_tags2.yaml").getFile(); + celebornConf.set(CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH(), file); + configService.refreshCache(); + + verifyModifiedTags(configService); + } + + public void verifyTags(ConfigService configService) { + SystemConfig systemConfig = configService.getSystemConfigFromCache(); + Map> tags = systemConfig.getTags(); + + Set tag1 = tags.getOrDefault("tag1", new HashSet<>()); + Assert.assertEquals(tag1.size(), 2); + Assert.assertTrue(tag1.contains("host1:1111")); + Assert.assertTrue(tag1.contains("host2:2222")); + + Set tag2 = tags.getOrDefault("tag2", new HashSet<>()); + Assert.assertEquals(tag2.size(), 2); + Assert.assertTrue(tag2.contains("host3:3333")); + Assert.assertTrue(tag2.contains("host4:4444")); + + Set tag3 = tags.getOrDefault("tag3", new HashSet<>()); + Assert.assertEquals(tag3.size(), 0); + + verifyTenantAndUserTagsAsNull(configService); + } + + public void verifyModifiedTags(ConfigService configService) { + System.out.println("Tags changed"); + + SystemConfig systemConfig = configService.getSystemConfigFromCache(); + Map> tags = systemConfig.getTags(); + + Set tag1 = tags.getOrDefault("tag1", new HashSet<>()); + Assert.assertEquals(tag1.size(), 1); + Assert.assertTrue(tag1.contains("host1:1111")); + + Set tag2 = tags.getOrDefault("tag2", new HashSet<>()); + Assert.assertEquals(tag2.size(), 0); + + Set tag3 = tags.getOrDefault("tag3", new HashSet<>()); + Assert.assertEquals(tag3.size(), 1); + Assert.assertTrue(tag3.contains("host5:5555")); + } + + public void verifyTenantAndUserTagsAsNull(ConfigService configService) { + TenantConfig tenantConfig = configService.getRawTenantConfigFromCache("tenant_id1"); + Assert.assertNull(tenantConfig.getTags()); + + DynamicConfig userConfig = configService.getTenantUserConfigFromCache("tenant_id1", "Jerry"); + Assert.assertNull(userConfig.getTags()); + } } diff --git a/service/src/test/resources/dynamicConfig_tags.yaml b/service/src/test/resources/dynamicConfig_tags.yaml new file mode 100644 index 000000000..d764463ee --- /dev/null +++ b/service/src/test/resources/dynamicConfig_tags.yaml @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +- level: SYSTEM + config: + celeborn.test.int.only: 100 + tags: + tag1: + - 'host1:1111' + - 'host2:2222' + tag2: + - 'host3:3333' + - 'host4:4444' + +- tenantId: tenant_id1 + level: TENANT + config: + celeborn.test.int.only: 50 + # Tags should be null for tenant/user config + tags: + tag1: + - 'host1:1111' + - 'host2:2222' + users: + - name: Jerry + config: + celeborn.test.int.only: 10 + # Tags should be null for tenant/user config + tags: + tag1: + - 'host1:1111' + - 'host2:2222' \ No newline at end of file diff --git a/service/src/test/resources/dynamicConfig_tags2.yaml b/service/src/test/resources/dynamicConfig_tags2.yaml new file mode 100644 index 000000000..385425b01 --- /dev/null +++ b/service/src/test/resources/dynamicConfig_tags2.yaml @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +- level: SYSTEM + config: + celeborn.test.int.only: 100 + tags: + tag1: + - 'host1:1111' + tag3: + - 'host5:5555' + +- tenantId: tenant_id + level: TENANT + config: + celeborn.test.int.only: 50 + users: + - name: tenant1 + config: + celeborn.test.int.only: 10 \ No newline at end of file