[CELEBORN-1550] Add support of providing custom dynamic store backend implementation
### What changes were proposed in this pull request? Adding support of providing custom dynamic store backend implementation, users can now pass there own implementation for dynamic config store backend. This change also keep the backwards compatibility of supporting short names for backend like "FS" and "DB" ### Why are the changes needed? Currently celeborn only supports File and DB based backend while there can be other ways of managing these configs. ### Does this PR introduce _any_ user-facing change? NO, user facing behaviour will be same. ### How was this patch tested? Existing UTs verifies that this change is working for "FS" and "DB" implementation. Closes #2670 from s0nskar/dynamic_config. Authored-by: Sanskar Modi <sanskarmodi97@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
parent
59b88beb62
commit
a0b04d0036
@ -5253,12 +5253,12 @@ object CelebornConf extends Logging {
|
||||
val DYNAMIC_CONFIG_STORE_BACKEND: OptionalConfigEntry[String] =
|
||||
buildConf("celeborn.dynamicConfig.store.backend")
|
||||
.categories("master", "worker")
|
||||
.doc("Store backend for dynamic config service. Available options: FS, DB. " +
|
||||
.doc("Store backend for dynamic config service. The backend can be specified in two ways:" +
|
||||
" - Using short names: Default available options are FS, DB." +
|
||||
" - Using the fully qualified class name of the backend implementation." +
|
||||
"If not provided, it means that dynamic configuration is disabled.")
|
||||
.version("0.4.0")
|
||||
.stringConf
|
||||
.transform(_.toUpperCase(Locale.ROOT))
|
||||
.checkValues(Set("FS", "DB"))
|
||||
.createOptional
|
||||
|
||||
val DYNAMIC_CONFIG_REFRESH_INTERVAL: ConfigEntry[Long] =
|
||||
|
||||
@ -586,6 +586,19 @@ object Utils extends Logging {
|
||||
}
|
||||
}
|
||||
|
||||
def instantiateDynamicConfigStoreBackend[T](className: String, conf: CelebornConf): T = {
|
||||
try {
|
||||
DynConstructors.builder().impl(className, classOf[CelebornConf])
|
||||
.build[T]()
|
||||
.newInstance(conf)
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
throw new CelebornException(
|
||||
s"Failed to instantiate dynamic config store backend $className.",
|
||||
e)
|
||||
}
|
||||
}
|
||||
|
||||
def getCodeSourceLocation(clazz: Class[_]): String = {
|
||||
new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI).getPath
|
||||
}
|
||||
|
||||
@ -21,7 +21,7 @@ license: |
|
||||
| --- | ------- | --------- | ----------- | ----- | ---------- |
|
||||
| celeborn.cluster.name | default | false | Celeborn cluster name. | 0.5.0 | |
|
||||
| celeborn.dynamicConfig.refresh.interval | 120s | false | Interval for refreshing the corresponding dynamic config periodically. | 0.4.0 | |
|
||||
| celeborn.dynamicConfig.store.backend | <undefined> | false | Store backend for dynamic config service. Available options: FS, DB. If not provided, it means that dynamic configuration is disabled. | 0.4.0 | |
|
||||
| celeborn.dynamicConfig.store.backend | <undefined> | false | Store backend for dynamic config service. The backend can be specified in two ways: - Using short names: Default available options are FS, DB. - Using the fully qualified class name of the backend implementation.If not provided, it means that dynamic configuration is disabled. | 0.4.0 | |
|
||||
| celeborn.dynamicConfig.store.db.fetch.pageSize | 1000 | false | The page size for db store to query configurations. | 0.5.0 | |
|
||||
| celeborn.dynamicConfig.store.db.hikari.connectionTimeout | 30s | false | The connection timeout that a client will wait for a connection from the pool for db store backend. | 0.5.0 | |
|
||||
| celeborn.dynamicConfig.store.db.hikari.driverClassName | | false | The jdbc driver class name of db store backend. | 0.5.0 | |
|
||||
|
||||
@ -21,7 +21,7 @@ license: |
|
||||
| --- | ------- | --------- | ----------- | ----- | ---------- |
|
||||
| celeborn.cluster.name | default | false | Celeborn cluster name. | 0.5.0 | |
|
||||
| celeborn.dynamicConfig.refresh.interval | 120s | false | Interval for refreshing the corresponding dynamic config periodically. | 0.4.0 | |
|
||||
| celeborn.dynamicConfig.store.backend | <undefined> | false | Store backend for dynamic config service. Available options: FS, DB. If not provided, it means that dynamic configuration is disabled. | 0.4.0 | |
|
||||
| celeborn.dynamicConfig.store.backend | <undefined> | false | Store backend for dynamic config service. The backend can be specified in two ways: - Using short names: Default available options are FS, DB. - Using the fully qualified class name of the backend implementation.If not provided, it means that dynamic configuration is disabled. | 0.4.0 | |
|
||||
| celeborn.dynamicConfig.store.db.fetch.pageSize | 1000 | false | The page size for db store to query configurations. | 0.5.0 | |
|
||||
| celeborn.dynamicConfig.store.db.hikari.connectionTimeout | 30s | false | The connection timeout that a client will wait for a connection from the pool for db store backend. | 0.5.0 | |
|
||||
| celeborn.dynamicConfig.store.db.hikari.driverClassName | | false | The jdbc driver class name of db store backend. | 0.5.0 | |
|
||||
|
||||
@ -46,8 +46,9 @@ If tenant-user-level configuration is missing, it will fall back to the tenant-l
|
||||
## Config Service
|
||||
The config service provides a configuration management service with a local cache for both static and dynamic configurations. Moreover, `ConfigService` is
|
||||
a pluggable service interface whose implementation can vary based on different storage backends. The storage backend for `ConfigService` is specified by the
|
||||
configuration key `celeborn.dynamicConfig.store.backend`, and it currently supports both filesystem (`FS`) and database (`DB`) as storage backends. If no
|
||||
storage backend is specified, this indicates that the config service is disabled.
|
||||
configuration key `celeborn.dynamicConfig.store.backend`, and it currently supports filesystem (`FS`) and database (`DB`) as storage backends by default.
|
||||
Additionally, users can provide their own implementation by extending the `ConfigService` interface and using the fully qualified class name of the implementation
|
||||
as storage backend. If no storage backend is specified, this indicates that the config service is disabled.
|
||||
|
||||
### FileSystem Config Service
|
||||
The filesystem config service enables the use of dynamic configuration files, the location of which is set by the configuration key `celeborn.dynamicConfig.store.fs.path`.
|
||||
|
||||
@ -110,4 +110,11 @@ public interface ConfigService {
|
||||
|
||||
/** Shutdowns configuration management service. */
|
||||
void shutdown();
|
||||
|
||||
/**
|
||||
* Retrieves the name of the configuration service.
|
||||
*
|
||||
* @return The name of the configuration service
|
||||
*/
|
||||
String getName();
|
||||
}
|
||||
|
||||
@ -57,6 +57,11 @@ public class DbConfigServiceImpl extends BaseConfigServiceImpl implements Config
|
||||
Function.identity())));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "DB";
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public IServiceManager getServiceManager() {
|
||||
return iServiceManager;
|
||||
|
||||
@ -18,12 +18,24 @@
|
||||
package org.apache.celeborn.server.common.service.config;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Locale;
|
||||
|
||||
import org.apache.celeborn.common.CelebornConf;
|
||||
import org.apache.celeborn.common.util.Utils;
|
||||
|
||||
public class DynamicConfigServiceFactory {
|
||||
private static volatile ConfigService _INSTANCE;
|
||||
|
||||
// short names for dynamic config store backends
|
||||
private static final HashMap<String, String> dynamicConfigStoreBackendShortNames =
|
||||
new HashMap<String, String>() {
|
||||
{
|
||||
put("FS", FsConfigServiceImpl.class.getName());
|
||||
put("DB", DbConfigServiceImpl.class.getName());
|
||||
}
|
||||
};
|
||||
|
||||
public static ConfigService getConfigService(CelebornConf celebornConf) throws IOException {
|
||||
if (celebornConf.dynamicConfigStoreBackend().isEmpty()) {
|
||||
return null;
|
||||
@ -32,12 +44,13 @@ public class DynamicConfigServiceFactory {
|
||||
if (_INSTANCE == null) {
|
||||
synchronized (DynamicConfigServiceFactory.class) {
|
||||
if (_INSTANCE == null) {
|
||||
String configStoreBackend = celebornConf.dynamicConfigStoreBackend().get();
|
||||
if ("FS".equals(configStoreBackend)) {
|
||||
_INSTANCE = new FsConfigServiceImpl(celebornConf);
|
||||
} else {
|
||||
_INSTANCE = new DbConfigServiceImpl(celebornConf);
|
||||
}
|
||||
String configStoreBackendName = celebornConf.dynamicConfigStoreBackend().get();
|
||||
String configStoreBackendClass =
|
||||
dynamicConfigStoreBackendShortNames.getOrDefault(
|
||||
configStoreBackendName.toUpperCase(Locale.ROOT), configStoreBackendName);
|
||||
|
||||
_INSTANCE =
|
||||
Utils.instantiateDynamicConfigStoreBackend(configStoreBackendClass, celebornConf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -104,4 +104,9 @@ public class FsConfigServiceImpl extends BaseConfigServiceImpl implements Config
|
||||
}
|
||||
return configFile;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "FS";
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user