[KYUUBI #3504] Extend JDBC URL to support catalog

### _Why are the changes needed?_

fix #3504

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3516 from jiaoqingbo/kyuubi3504.

Closes #3504

93fba696 [jiaoqingbo] remove outdated comment
af3e6a16 [jiaoqingbo] code review
894877b7 [jiaoqingbo] code review
856b28fc [jiaoqingbo] code review
60876a95 [jiaoqingbo] [KYUUBI #3504] Extend JDBC URL to support catalog

Authored-by: jiaoqingbo <1178404354@qq.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
jiaoqingbo 2022-10-08 16:05:03 +08:00 committed by Cheng Pan
parent fb6516a588
commit f76834415a
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
8 changed files with 92 additions and 27 deletions

View File

@ -54,6 +54,14 @@ class FlinkSessionImpl(
override def open(): Unit = {
executor.openSession(handle.identifier.toString)
normalizedConf.foreach {
case ("use:catalog", catalog) =>
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
try {
tableEnv.useCatalog(catalog)
} catch {
case NonFatal(e) =>
throw e
}
case ("use:database", database) =>
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
try {

View File

@ -51,6 +51,13 @@ class SparkSessionImpl(
override def open(): Unit = {
normalizedConf.foreach {
case ("use:catalog", catalog) =>
try {
SparkCatalogShim().setCurrentCatalog(spark, catalog)
} catch {
case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") =>
warn(e.getMessage())
}
case ("use:database", database) =>
try {
SparkCatalogShim().setCurrentDatabase(spark, database)
@ -58,9 +65,6 @@ class SparkSessionImpl(
case e
if database == "default" && e.getMessage != null &&
e.getMessage.contains("not found") =>
// use:database is from hive so the catalog is always session catalog which must have
// default namespace `default`. But as spark support v2 catalog, catalog may not have
// default namespace. Here we do nothing for compatible both session and v2 catalog.
}
case (key, value) => setModifiableConfig(key, value)
}

View File

@ -19,9 +19,7 @@ package org.apache.kyuubi.engine.trino.session
import java.net.URI
import java.time.ZoneId
import java.util.Collections
import java.util.Locale
import java.util.Optional
import java.util.{Collections, Locale, Optional}
import java.util.concurrent.TimeUnit
import io.airlift.units.Duration
@ -36,8 +34,7 @@ import org.apache.kyuubi.engine.trino.{TrinoConf, TrinoContext, TrinoStatement}
import org.apache.kyuubi.engine.trino.event.TrinoSessionEvent
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.session.AbstractSession
import org.apache.kyuubi.session.SessionManager
import org.apache.kyuubi.session.{AbstractSession, SessionManager}
class TrinoSessionImpl(
protocol: TProtocolVersion,
@ -50,32 +47,38 @@ class TrinoSessionImpl(
var trinoContext: TrinoContext = _
private var clientSession: ClientSession = _
private var catalogName: String = null
private var databaseName: String = null
private val sessionEvent = TrinoSessionEvent(this)
override def open(): Unit = {
normalizedConf.foreach {
case ("use:database", database) => clientSession = createClientSession(database)
case ("use:catalog", catalog) => catalogName = catalog
case ("use:database", database) => databaseName = database
case _ => // do nothing
}
val httpClient = new OkHttpClient.Builder().build()
if (clientSession == null) {
clientSession = createClientSession()
}
clientSession = createClientSession()
trinoContext = TrinoContext(httpClient, clientSession)
super.open()
EventBus.post(sessionEvent)
}
private def createClientSession(schema: String = null): ClientSession = {
private def createClientSession(): ClientSession = {
val sessionConf = sessionManager.getConf
val connectionUrl = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_URL).getOrElse(
throw KyuubiSQLException("Trino server url can not be null!"))
val catalog = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG).getOrElse(
throw KyuubiSQLException("Trino default catalog can not be null!"))
if (catalogName == null) {
catalogName = sessionConf.get(
KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG).getOrElse(
throw KyuubiSQLException("Trino default catalog can not be null!"))
}
val user = sessionConf
.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY).getOrElse(currentUser)
val clientRequestTimeout = sessionConf.get(TrinoConf.CLIENT_REQUEST_TIMEOUT)
@ -88,8 +91,8 @@ class TrinoSessionImpl(
Optional.empty(),
Collections.emptySet(),
null,
catalog,
schema,
catalogName,
databaseName,
null,
ZoneId.systemDefault(),
Locale.getDefault,

View File

@ -27,7 +27,8 @@ public class JdbcConnectionParams {
// Prefer using a shorter camelCase param name instead of using the same name as the
// corresponding
// HiveServer2 config.
// For a jdbc url: jdbc:hive2://<host>:<port>/dbName;sess_var_list?hive_conf_list#hive_var_list,
// For a jdbc url:
// jdbc:hive2://<host>:<port>/catalogName/dbName;sess_var_list?hive_conf_list#hive_var_list,
// client side params are specified in sess_var_list
// Client param names:
@ -115,6 +116,7 @@ public class JdbcConnectionParams {
private String host = null;
private int port = 0;
private String jdbcUriString;
private String catalogName;
private String dbName = Utils.DEFAULT_DATABASE;
private Map<String, String> hiveConfs = new LinkedHashMap<>();
private Map<String, String> hiveVars = new LinkedHashMap<>();
@ -130,6 +132,7 @@ public class JdbcConnectionParams {
this.host = params.host;
this.port = params.port;
this.jdbcUriString = params.jdbcUriString;
this.catalogName = params.catalogName;
this.dbName = params.dbName;
this.hiveConfs.putAll(params.hiveConfs);
this.hiveVars.putAll(params.hiveVars);
@ -152,6 +155,10 @@ public class JdbcConnectionParams {
return jdbcUriString;
}
public String getCatalogName() {
return catalogName;
}
public String getDbName() {
return dbName;
}
@ -196,6 +203,10 @@ public class JdbcConnectionParams {
this.jdbcUriString = jdbcUriString;
}
public void setCatalogName(String catalogName) {
this.catalogName = catalogName;
}
public void setDbName(String dbName) {
this.dbName = dbName;
}

View File

@ -705,6 +705,10 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable {
for (Entry<String, String> hiveVar : connParams.getHiveVars().entrySet()) {
openConf.put("set:hivevar:" + hiveVar.getKey(), hiveVar.getValue());
}
// switch the catalog
if (connParams.getCatalogName() != null) {
openConf.put("use:catalog", connParams.getCatalogName());
}
// switch the database
openConf.put("use:database", connParams.getDbName());
// set the fetchSize

View File

@ -152,14 +152,27 @@ public class Utils {
String sessVars = jdbcURI.getPath();
if ((sessVars != null) && !sessVars.isEmpty()) {
String dbName = "";
String catalogName = "";
// removing leading '/' returned by getPath()
sessVars = sessVars.substring(1);
if (!sessVars.contains(";")) {
// only dbname is provided
dbName = sessVars;
if (sessVars.contains("/")) {
catalogName = sessVars.substring(0, sessVars.indexOf('/'));
dbName = sessVars.substring(sessVars.indexOf('/') + 1);
} else {
// only dbname is provided
dbName = sessVars;
}
} else {
// we have dbname followed by session parameters
dbName = sessVars.substring(0, sessVars.indexOf(';'));
String catalogAndDb = sessVars.substring(0, sessVars.indexOf(';'));
if (catalogAndDb.contains("/")) {
catalogName = catalogAndDb.substring(0, catalogAndDb.indexOf('/'));
dbName = catalogAndDb.substring(catalogAndDb.indexOf('/') + 1);
} else {
// only dbname is provided
dbName = catalogAndDb;
}
sessVars = sessVars.substring(sessVars.indexOf(';') + 1);
Matcher sessMatcher = pattern.matcher(sessVars);
while (sessMatcher.find()) {
@ -169,6 +182,9 @@ public class Utils {
}
}
}
if (!catalogName.isEmpty()) {
connParams.setCatalogName(catalogName);
}
if (!dbName.isEmpty()) {
connParams.setDbName(dbName);
}

View File

@ -33,23 +33,39 @@ public class UtilsTest {
private String expectedHost;
private String expectedPort;
private String expectedCatalog;
private String expectedDb;
private String uri;
@Parameterized.Parameters
public static Collection<String[]> data() {
return Arrays.asList(
new String[][] {
{"localhost", "10009", "jdbc:hive2:///db;k1=v1?k2=v2#k3=v3"},
{"localhost", "10009", "jdbc:hive2:///"},
{"localhost", "10009", "jdbc:kyuubi://"},
{"localhost", "10009", "jdbc:hive2://"},
{"hostname", "10018", "jdbc:hive2://hostname:10018/db;k1=v1?k2=v2#k3=v3"}
{"localhost", "10009", null, "db", "jdbc:hive2:///db;k1=v1?k2=v2#k3=v3"},
{"localhost", "10009", null, "default", "jdbc:hive2:///"},
{"localhost", "10009", null, "default", "jdbc:kyuubi://"},
{"localhost", "10009", null, "default", "jdbc:hive2://"},
{"hostname", "10018", null, "db", "jdbc:hive2://hostname:10018/db;k1=v1?k2=v2#k3=v3"},
{
"hostname",
"10018",
"catalog",
"db",
"jdbc:hive2://hostname:10018/catalog/db;k1=v1?k2=v2#k3=v3"
}
});
}
public UtilsTest(String expectedHost, String expectedPort, String uri) {
public UtilsTest(
String expectedHost,
String expectedPort,
String expectedCatalog,
String expectedDb,
String uri) {
this.expectedHost = expectedHost;
this.expectedPort = expectedPort;
this.expectedCatalog = expectedCatalog;
this.expectedDb = expectedDb;
this.uri = uri;
}
@ -58,5 +74,7 @@ public class UtilsTest {
JdbcConnectionParams jdbcConnectionParams1 = extractURLComponents(uri, new Properties());
assertEquals(expectedHost, jdbcConnectionParams1.getHost());
assertEquals(Integer.parseInt(expectedPort), jdbcConnectionParams1.getPort());
assertEquals(expectedCatalog, jdbcConnectionParams1.getCatalogName());
assertEquals(expectedDb, jdbcConnectionParams1.getDbName());
}
}

View File

@ -63,6 +63,7 @@ class KyuubiSessionImpl(
// TODO: needs improve the hardcode
optimizedConf.foreach {
case ("use:catalog", _) =>
case ("use:database", _) =>
case ("kyuubi.engine.pool.size.threshold", _) =>
case (key, value) => sessionConf.set(key, value)