[KYUUBI #3910] Add delta parameter for getAndIncrement method

### _Why are the changes needed?_
getAndIncrement method only increase index before this pr,  add a delta parameter can increase and decrease index. this pr is mainly for pr KYUUBI #3695

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3910 from CHzxp/getAndIncrement_branch.

Closes #3910

5c0cca85 [CHzxp] fix bug
3514471e [CHzxp] modify imp
f1ae57d6 [xinping] add delta parameter for getAndIncrement

Lead-authored-by: CHzxp <1959044956@qq.com>
Co-authored-by: xinping <jXD8_1uG>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
CHzxp 2022-12-06 16:07:40 +08:00 committed by Cheng Pan
parent ade2a53cc3
commit 2a903b01fe
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
3 changed files with 6 additions and 5 deletions

View File

@ -174,9 +174,10 @@ trait DiscoveryClient extends Logging {
* Atomically get an Int number and add one
* @param path the path of stored data,
* If the path does not exist, it will be created and initialized to 0
* @param delta the increase num
* @return the stored data under path
*/
def getAndIncrement(path: String): Int
def getAndIncrement(path: String, delta: Int = 1): Int
}
object DiscoveryClient {

View File

@ -303,7 +303,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
ByteSequence.from(initData.getBytes())).get()
}
def getAndIncrement(path: String): Int = {
def getAndIncrement(path: String, delta: Int = 1): Int = {
val lockPath = s"${path}_tmp_for_lock"
tryWithLock(lockPath, 60 * 1000) {
if (pathNonExists(path)) {
@ -311,7 +311,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
setData(path, String.valueOf(0).getBytes)
}
val s = new String(getData(path)).toInt
setData(path, String.valueOf(s + 1).getBytes)
setData(path, String.valueOf(s + delta).getBytes)
s
}
}

View File

@ -304,11 +304,11 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
secretNode.start()
}
def getAndIncrement(path: String): Int = {
def getAndIncrement(path: String, delta: Int = 1): Int = {
val dai = new DistributedAtomicInteger(zkClient, path, new RetryForever(1000))
var atomicVal: AtomicValue[Integer] = null
do {
atomicVal = dai.increment()
atomicVal = dai.add(delta)
} while (atomicVal == null || !atomicVal.succeeded())
atomicVal.preValue().intValue()
}