[KYUUBI #6034] Kyuubi Server HA&ZK get server from serverHosts support more strategy
# 🔍 Description ## Issue References 🔗 This pull request fixes #6034 ## Describe Your Solution 🔧 Currently, use beeline to connect kyuubiServer with HA mode, the strategy only support random, this will lead to a high load on the machine. So i make this pr to support choose strategy. [description] First, we need know, beeline connect kyuubiServer dependency on kyuubi-hive-jdbc, it is isolated from the kyuubi cluster, so the code only support random choose serverHost from zk node /${namespace}. Because kyuubi-hive-jdbc is a stateless module, only run once, cannot store var about get serverHost from zk node. [Solution] This pr, we could implement a interface named ChooseServerStrategy to choose serverHost. I implement two strategy 1. poll: it will create a zk node named ${namespace}-counter, when a beeline client want connect kyuubiServer, the node will increment 1, use this value to take the remainder from serverHosts, like counter % serverHost.size, so we could get a order serverHost 2. random: random get serverHost from serverHosts 3. User Definied Class: implemented the ChooseServerStrategy, then put the jar to beeline-jars, it can use your strategy to choose serverHost ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 Test the Strategy in my test Cluster #### Behavior Without This Pull Request ⚰️    #### Behavior With This Pull Request 🎉 [Use Case] 1. poll: `bin/beeline -u 'jdbc:hive2://xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi;zooKeeperStrategy=poll?spark.yarn.queue=root.kylin;spark.app.name=testspark;spark.shuffle.useOldFetchProtocol=true' -n mfw_hadoop --verbose=true --showNestedErrs=true` 2. random: `bin/beeline -u 'jdbc:hive2://xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi;zooKeeperStrategy=random?spark.yarn.queue=root.kylin;spark.app.name=testspark;spark.shuffle.useOldFetchProtocol=true' -n mfw_hadoop --verbose=true --showNestedErrs=true` or `bin/beeline -u 'jdbc:hive2://xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi?spark.yarn.queue=root.kylin;spark.app.name=testspark;spark.shuffle.useOldFetchProtocol=true' -n mfw_hadoop --verbose=true --showNestedErrs=true` 3. YourStrategy: `bin/beeline -u 'jdbc:hive2://xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi;zooKeeperStrategy=xxx.xxx.xxx.XxxChooseServerStrategy?spark.yarn.queue=root.kylin;spark.app.name=testspark;spark.shuffle.useOldFetchProtocol=true' -n mfw_hadoop --verbose=true --showNestedErrs=true` [Result: The Cluster have two Server (221,233)] 1. poll: 1.1. zkNode: counterValue  1.2. result:     2. random:    3. YourStrategy(the test case only get the first serverHost):    #### Related Unit Tests There is no Unit Tests. --- # Checklist 📝 - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6213 from davidyuan1223/ha_zk_support_more_strategy. Closes #6034 961d3e989 [Bowen Liang] rename ServerStrategyFactory to ServerSelectStrategyFactory 353f94059 [Bowen Liang] repeat 8822ad471 [Bowen Liang] repeat 619339402 [Bowen Liang] nit e94f9e909 [Bowen Liang] nit 40f427ae5 [Bowen Liang] rename StrategyFactory to StrategyFactoryServerStrategyFactory 7668f99cc [Bowen Liang] test name e194ea62f [Bowen Liang] remove ZooKeeperHiveClientException from method signature of chooseServer 265965e5d [Bowen Liang] polling b39c56700 [Bowen Liang] style 1ab79b494 [Bowen Liang] strategyName 8f8ca28f2 [Bowen Liang] nit 228bf1091 [Bowen Liang] rename parameter zooKeeperStrategy to serverSelectStrategy 125c82358 [Bowen Liang] rename ChooseServerStrategy to ServerSelectStrategy b4aeb3dbd [Bowen Liang] repeat testing on pollingChooseStrategy 465548005 [davidyuan] update 09a84f1f9 [david yuan] remove the distirbuted lock 93f4a2699 [davidyuan] remove reset 7b0c1b811 [davidyuan] fix var not valid and counter getAndIncrement c95382a23 [davidyuan] fix var not valid and counter getAndIncrement 9ed2cac85 [david yuan] remove test comment 8eddd7682 [davidyuan] Add Strategy Unit Test Case and fix the polling strategy counter begin with 0 73952f878 [davidyuan] Kyuubi Server HA&ZK get server from serverHosts support more strategy 97b959776 [davidyuan] Kyuubi Server HA&ZK get server from serverHosts support more strategy ee5a9ad68 [davidyuan] Kyuubi Server HA&ZK get server from serverHosts support more strategy 6a0445357 [davidyuan] Kyuubi Server HA&ZK get server from serverHosts support more strategy 1892f148d [davidyuan] add common method to get session level config 7c0c6058d [yuanfuyuan] fix_4186 Lead-authored-by: Bowen Liang <liangbowen@gf.com.cn> Co-authored-by: davidyuan <yuanfuyuan@mafengwo.com> Co-authored-by: davidyuan <davidyuan1223@gmail.com> Co-authored-by: david yuan <davidyuan1223@gmail.com> Co-authored-by: yuanfuyuan <1406957364@qq.com> Signed-off-by: Bowen Liang <liangbowen@gf.com.cn>
This commit is contained in:
parent
e446724488
commit
8862767827
@ -132,6 +132,13 @@
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.kyuubi</groupId>
|
||||
<artifactId>kyuubi-hive-jdbc</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@ -34,8 +34,9 @@ import org.apache.kyuubi.ha.HighAvailabilityConf._
|
||||
import org.apache.kyuubi.ha.client._
|
||||
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
|
||||
import org.apache.kyuubi.ha.client.zookeeper.ZookeeperClientProvider._
|
||||
import org.apache.kyuubi.jdbc.hive.strategy.{ServerSelectStrategy, ServerSelectStrategyFactory}
|
||||
import org.apache.kyuubi.service._
|
||||
import org.apache.kyuubi.shaded.curator.framework.CuratorFrameworkFactory
|
||||
import org.apache.kyuubi.shaded.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
|
||||
import org.apache.kyuubi.shaded.curator.retry.ExponentialBackoffRetry
|
||||
import org.apache.kyuubi.shaded.zookeeper.ZooDefs
|
||||
import org.apache.kyuubi.shaded.zookeeper.data.ACL
|
||||
@ -227,4 +228,41 @@ abstract class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests
|
||||
discovery.stop()
|
||||
}
|
||||
}
|
||||
|
||||
test("server select strategy with zookeeper") {
|
||||
val zkClient = CuratorFrameworkFactory.builder()
|
||||
.connectString(getConnectString)
|
||||
.sessionTimeoutMs(5000)
|
||||
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
|
||||
.build
|
||||
zkClient.start()
|
||||
|
||||
val namespace = "kyuubi-strategy-test"
|
||||
val testServerHosts = Seq(
|
||||
"testNode1",
|
||||
"testNode2",
|
||||
"testNode3").asJava
|
||||
// test polling strategy
|
||||
val pollingStrategy = ServerSelectStrategyFactory.createStrategy("polling")
|
||||
1 to testServerHosts.size() * 2 foreach { _ =>
|
||||
assertResult(f"testNode1")(pollingStrategy.chooseServer(testServerHosts, zkClient, namespace))
|
||||
assertResult(f"testNode2")(pollingStrategy.chooseServer(testServerHosts, zkClient, namespace))
|
||||
assertResult(f"testNode3")(pollingStrategy.chooseServer(testServerHosts, zkClient, namespace))
|
||||
}
|
||||
|
||||
// test only get first serverHost strategy
|
||||
val customStrategy = new ServerSelectStrategy {
|
||||
override def chooseServer(
|
||||
serverHosts: util.List[String],
|
||||
zkClient: CuratorFramework,
|
||||
namespace: String): String = serverHosts.get(0)
|
||||
}
|
||||
1 to testServerHosts.size() * 2 foreach { _ =>
|
||||
assertResult("testNode1") {
|
||||
customStrategy.chooseServer(testServerHosts, zkClient, namespace)
|
||||
}
|
||||
}
|
||||
|
||||
zkClient.close()
|
||||
}
|
||||
}
|
||||
|
||||
@ -79,6 +79,7 @@ public class JdbcConnectionParams {
|
||||
// Use ZooKeeper for indirection while using dynamic service discovery
|
||||
static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
|
||||
static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
|
||||
static final String SERVER_SELECT_STRATEGY = "serverSelectStrategy";
|
||||
// Default namespace value on ZooKeeper.
|
||||
// This value is used if the param "zooKeeperNamespace" is not specified in the JDBC Uri.
|
||||
static final String ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2";
|
||||
|
||||
@ -22,9 +22,11 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy;
|
||||
import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategyFactory;
|
||||
import org.apache.kyuubi.jdbc.hive.strategy.zk.RandomSelectStrategy;
|
||||
import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;
|
||||
import org.apache.kyuubi.shaded.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.kyuubi.shaded.curator.retry.ExponentialBackoffRetry;
|
||||
@ -111,7 +113,7 @@ class ZooKeeperHiveClientHelper {
|
||||
try (CuratorFramework zooKeeperClient = getZkClient(connParams)) {
|
||||
List<String> serverHosts = getServerHosts(connParams, zooKeeperClient);
|
||||
// Now pick a server node randomly
|
||||
String serverNode = serverHosts.get(ThreadLocalRandom.current().nextInt(serverHosts.size()));
|
||||
String serverNode = chooseServer(connParams, serverHosts, zooKeeperClient);
|
||||
updateParamsWithZKServerNode(connParams, zooKeeperClient, serverNode);
|
||||
} catch (Exception e) {
|
||||
throw new ZooKeeperHiveClientException(
|
||||
@ -120,6 +122,22 @@ class ZooKeeperHiveClientHelper {
|
||||
// Close the client connection with ZooKeeper
|
||||
}
|
||||
|
||||
private static String chooseServer(
|
||||
JdbcConnectionParams connParams, List<String> serverHosts, CuratorFramework zkClient) {
|
||||
String zooKeeperNamespace = getZooKeeperNamespace(connParams);
|
||||
String strategyName =
|
||||
connParams
|
||||
.getSessionVars()
|
||||
.getOrDefault(
|
||||
JdbcConnectionParams.SERVER_SELECT_STRATEGY, RandomSelectStrategy.strategyName);
|
||||
try {
|
||||
ServerSelectStrategy strategy = ServerSelectStrategyFactory.createStrategy(strategyName);
|
||||
return strategy.chooseServer(serverHosts, zkClient, zooKeeperNamespace);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to choose server with strategy " + strategyName, e);
|
||||
}
|
||||
}
|
||||
|
||||
static List<JdbcConnectionParams> getDirectParamsList(JdbcConnectionParams connParams)
|
||||
throws ZooKeeperHiveClientException {
|
||||
try (CuratorFramework zooKeeperClient = getZkClient(connParams)) {
|
||||
|
||||
@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.jdbc.hive.strategy;
|
||||
|
||||
import java.util.List;
|
||||
import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;
|
||||
|
||||
public interface ServerSelectStrategy {
|
||||
String chooseServer(List<String> serverHosts, CuratorFramework zkClient, String namespace);
|
||||
}
|
||||
@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.jdbc.hive.strategy;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import org.apache.kyuubi.jdbc.hive.strategy.zk.PollingSelectStrategy;
|
||||
import org.apache.kyuubi.jdbc.hive.strategy.zk.RandomSelectStrategy;
|
||||
|
||||
public class ServerSelectStrategyFactory {
|
||||
public static ServerSelectStrategy createStrategy(String strategyName) {
|
||||
try {
|
||||
switch (strategyName) {
|
||||
case PollingSelectStrategy.strategyName:
|
||||
return new PollingSelectStrategy();
|
||||
case RandomSelectStrategy.strategyName:
|
||||
return new RandomSelectStrategy();
|
||||
default:
|
||||
Class<?> clazz = Class.forName(strategyName);
|
||||
if (ServerSelectStrategy.class.isAssignableFrom(clazz)) {
|
||||
Constructor<? extends ServerSelectStrategy> constructor =
|
||||
clazz.asSubclass(ServerSelectStrategy.class).getConstructor();
|
||||
return constructor.newInstance();
|
||||
} else {
|
||||
throw new ClassNotFoundException(
|
||||
"The loaded class does not implement ServerSelectStrategy");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to init server select strategy", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.jdbc.hive.strategy.zk;
|
||||
|
||||
import java.util.List;
|
||||
import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy;
|
||||
import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;
|
||||
import org.apache.kyuubi.shaded.curator.framework.recipes.atomic.AtomicValue;
|
||||
import org.apache.kyuubi.shaded.curator.framework.recipes.atomic.DistributedAtomicInteger;
|
||||
import org.apache.kyuubi.shaded.curator.retry.RetryForever;
|
||||
|
||||
public class PollingSelectStrategy implements ServerSelectStrategy {
|
||||
public static final String strategyName = "polling";
|
||||
|
||||
private static final String COUNTER_PATH_PREFIX = "/";
|
||||
private static final String COUNTER_PATH_SUFFIX = "-counter";
|
||||
|
||||
@Override
|
||||
public String chooseServer(
|
||||
List<String> serverHosts, CuratorFramework zkClient, String namespace) {
|
||||
String counterPath = COUNTER_PATH_PREFIX + namespace + COUNTER_PATH_SUFFIX;
|
||||
try {
|
||||
return serverHosts.get(getAndIncrement(zkClient, counterPath) % serverHosts.size());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to choose server by polling select strategy", e);
|
||||
}
|
||||
}
|
||||
|
||||
private int getAndIncrement(CuratorFramework zkClient, String path) throws Exception {
|
||||
DistributedAtomicInteger dai =
|
||||
new DistributedAtomicInteger(zkClient, path, new RetryForever(3000));
|
||||
AtomicValue<Integer> atomicVal;
|
||||
do {
|
||||
atomicVal = dai.add(1);
|
||||
} while (atomicVal == null || !atomicVal.succeeded());
|
||||
return atomicVal.preValue();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.jdbc.hive.strategy.zk;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy;
|
||||
import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;
|
||||
|
||||
public class RandomSelectStrategy implements ServerSelectStrategy {
|
||||
public static final String strategyName = "random";
|
||||
|
||||
@Override
|
||||
public String chooseServer(
|
||||
List<String> serverHosts, CuratorFramework zkClient, String namespace) {
|
||||
return serverHosts.get(ThreadLocalRandom.current().nextInt(serverHosts.size()));
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user