[KYUUBI #4145] Change lock and polling seq_num path on service discovery
### _Why are the changes needed?_
This PR proposes to change the paths of distributed lock and seq_num(used for POLLING engine pool select policy) on the Service Discovery component. The reason is that namespace `${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}/` should be dedicated to engine registration, we'd better use the separated namespace for other functionalities.
- lock path
```
# before
${serverSpace}_${shareLevel}_${engineType}/lock/${user}/${subdomain}
# after
${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}_lock/${user}/${subdomain}
```
- seq_num
```
# before
${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}/seq_num/${user}/${poolName}
# after
${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}_seqNum/${user}/${poolName}
```
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4145 from pan3793/namespace.
Closes #4145
c912b3f66 [Cheng Pan] name
3326b9b95 [Cheng Pan] name
10083db0b [Cheng Pan] Change lock and polloing seq_num path on service discovery
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
5318585550
commit
547e5ca617
@ -1807,7 +1807,7 @@ object KyuubiConf {
|
||||
.intConf
|
||||
.createWithDefault(-1)
|
||||
|
||||
val ENGINE_POOL_BALANCE_POLICY: ConfigEntry[String] =
|
||||
val ENGINE_POOL_SELECT_POLICY: ConfigEntry[String] =
|
||||
buildConf("kyuubi.engine.pool.selectPolicy")
|
||||
.doc("The select policy of an engine from the corresponding engine pool engine for " +
|
||||
"a session. <ul>" +
|
||||
|
||||
@ -90,7 +90,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def createClient(): Unit = {
|
||||
override def createClient(): Unit = {
|
||||
client = buildClient()
|
||||
kvClient = client.getKVClient()
|
||||
lockClient = client.getLockClient()
|
||||
@ -99,13 +99,13 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
leaseTTL = conf.get(HighAvailabilityConf.HA_ETCD_LEASE_TIMEOUT) / 1000
|
||||
}
|
||||
|
||||
def closeClient(): Unit = {
|
||||
override def closeClient(): Unit = {
|
||||
if (client != null) {
|
||||
client.close()
|
||||
}
|
||||
}
|
||||
|
||||
def create(path: String, mode: String, createParent: Boolean = true): String = {
|
||||
override def create(path: String, mode: String, createParent: Boolean = true): String = {
|
||||
// createParent can not effect here
|
||||
mode match {
|
||||
case "PERSISTENT" => kvClient.put(
|
||||
@ -116,7 +116,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
path
|
||||
}
|
||||
|
||||
def getData(path: String): Array[Byte] = {
|
||||
override def getData(path: String): Array[Byte] = {
|
||||
val response = kvClient.get(ByteSequence.from(path.getBytes())).get()
|
||||
if (response.getKvs.isEmpty) {
|
||||
throw new KyuubiException(s"Key[$path] not exists in ETCD, please check it.")
|
||||
@ -125,12 +125,12 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def setData(path: String, data: Array[Byte]): Boolean = {
|
||||
override def setData(path: String, data: Array[Byte]): Boolean = {
|
||||
val response = kvClient.put(ByteSequence.from(path.getBytes), ByteSequence.from(data)).get()
|
||||
response != null
|
||||
}
|
||||
|
||||
def getChildren(path: String): List[String] = {
|
||||
override def getChildren(path: String): List[String] = {
|
||||
val kvs = kvClient.get(
|
||||
ByteSequence.from(path.getBytes()),
|
||||
GetOption.newBuilder().isPrefix(true).build()).get().getKvs
|
||||
@ -142,25 +142,25 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def pathExists(path: String): Boolean = {
|
||||
override def pathExists(path: String): Boolean = {
|
||||
!pathNonExists(path)
|
||||
}
|
||||
|
||||
def pathNonExists(path: String): Boolean = {
|
||||
override def pathNonExists(path: String): Boolean = {
|
||||
kvClient.get(ByteSequence.from(path.getBytes())).get().getKvs.isEmpty
|
||||
}
|
||||
|
||||
def delete(path: String, deleteChildren: Boolean = false): Unit = {
|
||||
override def delete(path: String, deleteChildren: Boolean = false): Unit = {
|
||||
kvClient.delete(
|
||||
ByteSequence.from(path.getBytes()),
|
||||
DeleteOption.newBuilder().isPrefix(deleteChildren).build()).get()
|
||||
}
|
||||
|
||||
def monitorState(serviceDiscovery: ServiceDiscovery): Unit = {
|
||||
override def monitorState(serviceDiscovery: ServiceDiscovery): Unit = {
|
||||
// not need with etcd
|
||||
}
|
||||
|
||||
def tryWithLock[T](
|
||||
override def tryWithLock[T](
|
||||
lockPath: String,
|
||||
timeout: Long)(f: => T): T = {
|
||||
// the default unit is millis, covert to seconds.
|
||||
@ -195,7 +195,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def getServerHost(namespace: String): Option[(String, Int)] = {
|
||||
override def getServerHost(namespace: String): Option[(String, Int)] = {
|
||||
// TODO: use last one because to avoid touching some maybe-crashed engines
|
||||
// We need a big improvement here.
|
||||
getServiceNodesInfo(namespace, Some(1), silent = true) match {
|
||||
@ -204,7 +204,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def getEngineByRefId(
|
||||
override def getEngineByRefId(
|
||||
namespace: String,
|
||||
engineRefId: String): Option[(String, Int)] = {
|
||||
getServiceNodesInfo(namespace, silent = true)
|
||||
@ -212,7 +212,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
.map(data => (data.host, data.port))
|
||||
}
|
||||
|
||||
def getServiceNodesInfo(
|
||||
override def getServiceNodesInfo(
|
||||
namespace: String,
|
||||
sizeOpt: Option[Int] = None,
|
||||
silent: Boolean = false): Seq[ServiceNodeInfo] = {
|
||||
@ -241,7 +241,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def registerService(
|
||||
override def registerService(
|
||||
conf: KyuubiConf,
|
||||
namespace: String,
|
||||
serviceDiscovery: ServiceDiscovery,
|
||||
@ -267,7 +267,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def deregisterService(): Unit = {
|
||||
override def deregisterService(): Unit = {
|
||||
// close the EPHEMERAL_SEQUENTIAL node in etcd
|
||||
if (serviceNode != null) {
|
||||
if (serviceNode.lease != LEASE_NULL_VALUE) {
|
||||
@ -278,7 +278,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def postDeregisterService(namespace: String): Boolean = {
|
||||
override def postDeregisterService(namespace: String): Boolean = {
|
||||
if (namespace != null) {
|
||||
delete(DiscoveryPaths.makePath(null, namespace), true)
|
||||
true
|
||||
@ -287,7 +287,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def createAndGetServiceNode(
|
||||
override def createAndGetServiceNode(
|
||||
conf: KyuubiConf,
|
||||
namespace: String,
|
||||
instance: String,
|
||||
@ -297,7 +297,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
def startSecretNode(
|
||||
override def startSecretNode(
|
||||
createMode: String,
|
||||
basePath: String,
|
||||
initData: String,
|
||||
@ -307,7 +307,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
ByteSequence.from(initData.getBytes())).get()
|
||||
}
|
||||
|
||||
def getAndIncrement(path: String, delta: Int = 1): Int = {
|
||||
override def getAndIncrement(path: String, delta: Int = 1): Int = {
|
||||
val lockPath = s"${path}_tmp_for_lock"
|
||||
tryWithLock(lockPath, 60 * 1000) {
|
||||
if (pathNonExists(path)) {
|
||||
|
||||
@ -66,17 +66,17 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
@volatile private var serviceNode: PersistentNode = _
|
||||
private var watcher: DeRegisterWatcher = _
|
||||
|
||||
def createClient(): Unit = {
|
||||
override def createClient(): Unit = {
|
||||
zkClient.start()
|
||||
}
|
||||
|
||||
def closeClient(): Unit = {
|
||||
override def closeClient(): Unit = {
|
||||
if (zkClient != null) {
|
||||
zkClient.close()
|
||||
}
|
||||
}
|
||||
|
||||
def create(path: String, mode: String, createParent: Boolean = true): String = {
|
||||
override def create(path: String, mode: String, createParent: Boolean = true): String = {
|
||||
val builder =
|
||||
if (createParent) zkClient.create().creatingParentsIfNeeded() else zkClient.create()
|
||||
builder
|
||||
@ -84,27 +84,27 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
.forPath(path)
|
||||
}
|
||||
|
||||
def getData(path: String): Array[Byte] = {
|
||||
override def getData(path: String): Array[Byte] = {
|
||||
zkClient.getData.forPath(path)
|
||||
}
|
||||
|
||||
def setData(path: String, data: Array[Byte]): Boolean = {
|
||||
override def setData(path: String, data: Array[Byte]): Boolean = {
|
||||
zkClient.setData().forPath(path, data) != null
|
||||
}
|
||||
|
||||
def getChildren(path: String): List[String] = {
|
||||
override def getChildren(path: String): List[String] = {
|
||||
zkClient.getChildren.forPath(path).asScala.toList
|
||||
}
|
||||
|
||||
def pathExists(path: String): Boolean = {
|
||||
override def pathExists(path: String): Boolean = {
|
||||
zkClient.checkExists().forPath(path) != null
|
||||
}
|
||||
|
||||
def pathNonExists(path: String): Boolean = {
|
||||
override def pathNonExists(path: String): Boolean = {
|
||||
zkClient.checkExists().forPath(path) == null
|
||||
}
|
||||
|
||||
def delete(path: String, deleteChildren: Boolean = false): Unit = {
|
||||
override def delete(path: String, deleteChildren: Boolean = false): Unit = {
|
||||
if (deleteChildren) {
|
||||
zkClient.delete().deletingChildrenIfNeeded().forPath(path)
|
||||
} else {
|
||||
@ -112,7 +112,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def monitorState(serviceDiscovery: ServiceDiscovery): Unit = {
|
||||
override def monitorState(serviceDiscovery: ServiceDiscovery): Unit = {
|
||||
zkClient
|
||||
.getConnectionStateListenable.addListener(new ConnectionStateListener {
|
||||
private val isConnected = new AtomicBoolean(false)
|
||||
@ -141,7 +141,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
})
|
||||
}
|
||||
|
||||
def tryWithLock[T](lockPath: String, timeout: Long)(f: => T): T = {
|
||||
override def tryWithLock[T](lockPath: String, timeout: Long)(f: => T): T = {
|
||||
var lock: InterProcessSemaphoreMutex = null
|
||||
try {
|
||||
try {
|
||||
@ -189,7 +189,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def getServerHost(namespace: String): Option[(String, Int)] = {
|
||||
override def getServerHost(namespace: String): Option[(String, Int)] = {
|
||||
// TODO: use last one because to avoid touching some maybe-crashed engines
|
||||
// We need a big improvement here.
|
||||
getServiceNodesInfo(namespace, Some(1), silent = true) match {
|
||||
@ -198,7 +198,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def getEngineByRefId(
|
||||
override def getEngineByRefId(
|
||||
namespace: String,
|
||||
engineRefId: String): Option[(String, Int)] = {
|
||||
getServiceNodesInfo(namespace, silent = true)
|
||||
@ -206,7 +206,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
.map(data => (data.host, data.port))
|
||||
}
|
||||
|
||||
def getServiceNodesInfo(
|
||||
override def getServiceNodesInfo(
|
||||
namespace: String,
|
||||
sizeOpt: Option[Int] = None,
|
||||
silent: Boolean = false): Seq[ServiceNodeInfo] = {
|
||||
@ -235,7 +235,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def registerService(
|
||||
override def registerService(
|
||||
conf: KyuubiConf,
|
||||
namespace: String,
|
||||
serviceDiscovery: ServiceDiscovery,
|
||||
@ -254,7 +254,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
watchNode()
|
||||
}
|
||||
|
||||
def deregisterService(): Unit = {
|
||||
override def deregisterService(): Unit = {
|
||||
// close the EPHEMERAL_SEQUENTIAL node in zk
|
||||
if (serviceNode != null) {
|
||||
try {
|
||||
@ -268,7 +268,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def postDeregisterService(namespace: String): Boolean = {
|
||||
override def postDeregisterService(namespace: String): Boolean = {
|
||||
if (namespace != null) {
|
||||
try {
|
||||
delete(namespace, true)
|
||||
@ -283,7 +283,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
}
|
||||
|
||||
def createAndGetServiceNode(
|
||||
override def createAndGetServiceNode(
|
||||
conf: KyuubiConf,
|
||||
namespace: String,
|
||||
instance: String,
|
||||
@ -293,7 +293,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
def startSecretNode(
|
||||
override def startSecretNode(
|
||||
createMode: String,
|
||||
basePath: String,
|
||||
initData: String,
|
||||
@ -307,7 +307,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
|
||||
secretNode.start()
|
||||
}
|
||||
|
||||
def getAndIncrement(path: String, delta: Int = 1): Int = {
|
||||
override def getAndIncrement(path: String, delta: Int = 1): Int = {
|
||||
val dai = new DistributedAtomicInteger(zkClient, path, new RetryForever(1000))
|
||||
var atomicVal: AtomicValue[Integer] = null
|
||||
do {
|
||||
|
||||
@ -74,7 +74,7 @@ private[kyuubi] class EngineRef(
|
||||
|
||||
private val enginePoolIgnoreSubdomain: Boolean = conf.get(ENGINE_POOL_IGNORE_SUBDOMAIN)
|
||||
|
||||
private val enginePoolBalancePolicy: String = conf.get(ENGINE_POOL_BALANCE_POLICY)
|
||||
private val enginePoolSelectPolicy: String = conf.get(ENGINE_POOL_SELECT_POLICY)
|
||||
|
||||
// In case the multi kyuubi instances have the small gap of timeout, here we add
|
||||
// a small amount of time for timeout
|
||||
@ -97,12 +97,11 @@ private[kyuubi] class EngineRef(
|
||||
warn(s"Request engine pool size($clientPoolSize) exceeds, fallback to " +
|
||||
s"system threshold $poolThreshold")
|
||||
}
|
||||
val seqNum = enginePoolBalancePolicy match {
|
||||
val seqNum = enginePoolSelectPolicy match {
|
||||
case "POLLING" =>
|
||||
val snPath =
|
||||
DiscoveryPaths.makePath(
|
||||
s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType",
|
||||
"seq_num",
|
||||
s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}_seqNum",
|
||||
appUser,
|
||||
clientPoolName)
|
||||
DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
|
||||
@ -159,8 +158,7 @@ private[kyuubi] class EngineRef(
|
||||
case _ =>
|
||||
val lockPath =
|
||||
DiscoveryPaths.makePath(
|
||||
s"${serverSpace}_${shareLevel}_$engineType",
|
||||
"lock",
|
||||
s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}_lock",
|
||||
appUser,
|
||||
subdomain)
|
||||
discoveryClient.tryWithLock(
|
||||
|
||||
@ -204,7 +204,7 @@ trait EngineRefTests extends KyuubiFunSuite {
|
||||
conf.set(ENGINE_POOL_NAME, pool_name)
|
||||
conf.set(HighAvailabilityConf.HA_NAMESPACE, "engine_test")
|
||||
conf.set(HighAvailabilityConf.HA_ADDRESSES, getConnectString())
|
||||
conf.set(ENGINE_POOL_BALANCE_POLICY, "POLLING")
|
||||
conf.set(ENGINE_POOL_SELECT_POLICY, "POLLING")
|
||||
(0 until (10)).foreach { i =>
|
||||
val engine7 = new EngineRef(conf, user, "grp", id, null)
|
||||
val engineNumber = Integer.parseInt(engine7.subdomain.substring(pool_name.length + 1))
|
||||
|
||||
Loading…
Reference in New Issue
Block a user