diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala index 29c1734f8..77588180e 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala @@ -174,9 +174,10 @@ trait DiscoveryClient extends Logging { * Atomically get an Int number and add one * @param path the path of stored data, * If the path does not exist, it will be created and initialized to 0 + * @param delta the increase num * @return the stored data under path */ - def getAndIncrement(path: String): Int + def getAndIncrement(path: String, delta: Int = 1): Int } object DiscoveryClient { diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala index 727ba5cae..dabdc7063 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala @@ -303,7 +303,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient { ByteSequence.from(initData.getBytes())).get() } - def getAndIncrement(path: String): Int = { + def getAndIncrement(path: String, delta: Int = 1): Int = { val lockPath = s"${path}_tmp_for_lock" tryWithLock(lockPath, 60 * 1000) { if (pathNonExists(path)) { @@ -311,7 +311,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient { setData(path, String.valueOf(0).getBytes) } val s = new String(getData(path)).toInt - setData(path, String.valueOf(s + 1).getBytes) + setData(path, String.valueOf(s + delta).getBytes) s } } diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala index 4802b4f7b..ca46d1130 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala @@ -304,11 +304,11 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient { secretNode.start() } - def getAndIncrement(path: String): Int = { + def getAndIncrement(path: String, delta: Int = 1): Int = { val dai = new DistributedAtomicInteger(zkClient, path, new RetryForever(1000)) var atomicVal: AtomicValue[Integer] = null do { - atomicVal = dai.increment() + atomicVal = dai.add(delta) } while (atomicVal == null || !atomicVal.succeeded()) atomicVal.preValue().intValue() }