From f76834415a30eacbf2bfa702c7ae34ed6c814e7b Mon Sep 17 00:00:00 2001 From: jiaoqingbo <1178404354@qq.com> Date: Sat, 8 Oct 2022 16:05:03 +0800 Subject: [PATCH] [KYUUBI #3504] Extend JDBC URL to support catalog ### _Why are the changes needed?_ fix #3504 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3516 from jiaoqingbo/kyuubi3504. Closes #3504 93fba696 [jiaoqingbo] remove outdated comment af3e6a16 [jiaoqingbo] code review 894877b7 [jiaoqingbo] code review 856b28fc [jiaoqingbo] code review 60876a95 [jiaoqingbo] [KYUUBI #3504] Extend JDBC URL to support catalog Authored-by: jiaoqingbo <1178404354@qq.com> Signed-off-by: Cheng Pan --- .../flink/session/FlinkSessionImpl.scala | 8 +++++ .../spark/session/SparkSessionImpl.scala | 10 ++++-- .../trino/session/TrinoSessionImpl.scala | 31 ++++++++++--------- .../jdbc/hive/JdbcConnectionParams.java | 13 +++++++- .../kyuubi/jdbc/hive/KyuubiConnection.java | 4 +++ .../org/apache/kyuubi/jdbc/hive/Utils.java | 22 +++++++++++-- .../apache/kyuubi/jdbc/hive/UtilsTest.java | 30 ++++++++++++++---- .../kyuubi/session/KyuubiSessionImpl.scala | 1 + 8 files changed, 92 insertions(+), 27 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala index 7c6107ee3..03d9ce42e 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala @@ -54,6 +54,14 @@ class FlinkSessionImpl( override def open(): Unit = { executor.openSession(handle.identifier.toString) normalizedConf.foreach { + case ("use:catalog", catalog) => + val tableEnv = sessionContext.getExecutionContext.getTableEnvironment + try { + tableEnv.useCatalog(catalog) + } catch { + case NonFatal(e) => + throw e + } case ("use:database", database) => val tableEnv = sessionContext.getExecutionContext.getTableEnvironment try { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala index 4985b95ac..eb4c84e24 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala @@ -51,6 +51,13 @@ class SparkSessionImpl( override def open(): Unit = { normalizedConf.foreach { + case ("use:catalog", catalog) => + try { + SparkCatalogShim().setCurrentCatalog(spark, catalog) + } catch { + case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") => + warn(e.getMessage()) + } case ("use:database", database) => try { SparkCatalogShim().setCurrentDatabase(spark, database) @@ -58,9 +65,6 @@ class SparkSessionImpl( case e if database == "default" && e.getMessage != null && e.getMessage.contains("not found") => - // use:database is from hive so the catalog is always session catalog which must have - // default namespace `default`. But as spark support v2 catalog, catalog may not have - // default namespace. Here we do nothing for compatible both session and v2 catalog. } case (key, value) => setModifiableConfig(key, value) } diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala index ac7c246e8..a19d74d58 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala @@ -19,9 +19,7 @@ package org.apache.kyuubi.engine.trino.session import java.net.URI import java.time.ZoneId -import java.util.Collections -import java.util.Locale -import java.util.Optional +import java.util.{Collections, Locale, Optional} import java.util.concurrent.TimeUnit import io.airlift.units.Duration @@ -36,8 +34,7 @@ import org.apache.kyuubi.engine.trino.{TrinoConf, TrinoContext, TrinoStatement} import org.apache.kyuubi.engine.trino.event.TrinoSessionEvent import org.apache.kyuubi.events.EventBus import org.apache.kyuubi.operation.{Operation, OperationHandle} -import org.apache.kyuubi.session.AbstractSession -import org.apache.kyuubi.session.SessionManager +import org.apache.kyuubi.session.{AbstractSession, SessionManager} class TrinoSessionImpl( protocol: TProtocolVersion, @@ -50,32 +47,38 @@ class TrinoSessionImpl( var trinoContext: TrinoContext = _ private var clientSession: ClientSession = _ + private var catalogName: String = null + private var databaseName: String = null private val sessionEvent = TrinoSessionEvent(this) override def open(): Unit = { normalizedConf.foreach { - case ("use:database", database) => clientSession = createClientSession(database) + case ("use:catalog", catalog) => catalogName = catalog + case ("use:database", database) => databaseName = database case _ => // do nothing } val httpClient = new OkHttpClient.Builder().build() - if (clientSession == null) { - clientSession = createClientSession() - } + clientSession = createClientSession() trinoContext = TrinoContext(httpClient, clientSession) super.open() EventBus.post(sessionEvent) } - private def createClientSession(schema: String = null): ClientSession = { + private def createClientSession(): ClientSession = { val sessionConf = sessionManager.getConf val connectionUrl = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_URL).getOrElse( throw KyuubiSQLException("Trino server url can not be null!")) - val catalog = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG).getOrElse( - throw KyuubiSQLException("Trino default catalog can not be null!")) + + if (catalogName == null) { + catalogName = sessionConf.get( + KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG).getOrElse( + throw KyuubiSQLException("Trino default catalog can not be null!")) + } + val user = sessionConf .getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY).getOrElse(currentUser) val clientRequestTimeout = sessionConf.get(TrinoConf.CLIENT_REQUEST_TIMEOUT) @@ -88,8 +91,8 @@ class TrinoSessionImpl( Optional.empty(), Collections.emptySet(), null, - catalog, - schema, + catalogName, + databaseName, null, ZoneId.systemDefault(), Locale.getDefault, diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java index 670b7a243..385a9d6d6 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java @@ -27,7 +27,8 @@ public class JdbcConnectionParams { // Prefer using a shorter camelCase param name instead of using the same name as the // corresponding // HiveServer2 config. - // For a jdbc url: jdbc:hive2://:/dbName;sess_var_list?hive_conf_list#hive_var_list, + // For a jdbc url: + // jdbc:hive2://:/catalogName/dbName;sess_var_list?hive_conf_list#hive_var_list, // client side params are specified in sess_var_list // Client param names: @@ -115,6 +116,7 @@ public class JdbcConnectionParams { private String host = null; private int port = 0; private String jdbcUriString; + private String catalogName; private String dbName = Utils.DEFAULT_DATABASE; private Map hiveConfs = new LinkedHashMap<>(); private Map hiveVars = new LinkedHashMap<>(); @@ -130,6 +132,7 @@ public class JdbcConnectionParams { this.host = params.host; this.port = params.port; this.jdbcUriString = params.jdbcUriString; + this.catalogName = params.catalogName; this.dbName = params.dbName; this.hiveConfs.putAll(params.hiveConfs); this.hiveVars.putAll(params.hiveVars); @@ -152,6 +155,10 @@ public class JdbcConnectionParams { return jdbcUriString; } + public String getCatalogName() { + return catalogName; + } + public String getDbName() { return dbName; } @@ -196,6 +203,10 @@ public class JdbcConnectionParams { this.jdbcUriString = jdbcUriString; } + public void setCatalogName(String catalogName) { + this.catalogName = catalogName; + } + public void setDbName(String dbName) { this.dbName = dbName; } diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java index f39bef7aa..c5e80efe3 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java @@ -705,6 +705,10 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable { for (Entry hiveVar : connParams.getHiveVars().entrySet()) { openConf.put("set:hivevar:" + hiveVar.getKey(), hiveVar.getValue()); } + // switch the catalog + if (connParams.getCatalogName() != null) { + openConf.put("use:catalog", connParams.getCatalogName()); + } // switch the database openConf.put("use:database", connParams.getDbName()); // set the fetchSize diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/Utils.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/Utils.java index 5309cdfc6..886daeb71 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/Utils.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/Utils.java @@ -152,14 +152,27 @@ public class Utils { String sessVars = jdbcURI.getPath(); if ((sessVars != null) && !sessVars.isEmpty()) { String dbName = ""; + String catalogName = ""; // removing leading '/' returned by getPath() sessVars = sessVars.substring(1); if (!sessVars.contains(";")) { - // only dbname is provided - dbName = sessVars; + if (sessVars.contains("/")) { + catalogName = sessVars.substring(0, sessVars.indexOf('/')); + dbName = sessVars.substring(sessVars.indexOf('/') + 1); + } else { + // only dbname is provided + dbName = sessVars; + } } else { // we have dbname followed by session parameters - dbName = sessVars.substring(0, sessVars.indexOf(';')); + String catalogAndDb = sessVars.substring(0, sessVars.indexOf(';')); + if (catalogAndDb.contains("/")) { + catalogName = catalogAndDb.substring(0, catalogAndDb.indexOf('/')); + dbName = catalogAndDb.substring(catalogAndDb.indexOf('/') + 1); + } else { + // only dbname is provided + dbName = catalogAndDb; + } sessVars = sessVars.substring(sessVars.indexOf(';') + 1); Matcher sessMatcher = pattern.matcher(sessVars); while (sessMatcher.find()) { @@ -169,6 +182,9 @@ public class Utils { } } } + if (!catalogName.isEmpty()) { + connParams.setCatalogName(catalogName); + } if (!dbName.isEmpty()) { connParams.setDbName(dbName); } diff --git a/kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/UtilsTest.java b/kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/UtilsTest.java index c87b463c3..c890c8731 100644 --- a/kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/UtilsTest.java +++ b/kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/UtilsTest.java @@ -33,23 +33,39 @@ public class UtilsTest { private String expectedHost; private String expectedPort; + private String expectedCatalog; + private String expectedDb; private String uri; @Parameterized.Parameters public static Collection data() { return Arrays.asList( new String[][] { - {"localhost", "10009", "jdbc:hive2:///db;k1=v1?k2=v2#k3=v3"}, - {"localhost", "10009", "jdbc:hive2:///"}, - {"localhost", "10009", "jdbc:kyuubi://"}, - {"localhost", "10009", "jdbc:hive2://"}, - {"hostname", "10018", "jdbc:hive2://hostname:10018/db;k1=v1?k2=v2#k3=v3"} + {"localhost", "10009", null, "db", "jdbc:hive2:///db;k1=v1?k2=v2#k3=v3"}, + {"localhost", "10009", null, "default", "jdbc:hive2:///"}, + {"localhost", "10009", null, "default", "jdbc:kyuubi://"}, + {"localhost", "10009", null, "default", "jdbc:hive2://"}, + {"hostname", "10018", null, "db", "jdbc:hive2://hostname:10018/db;k1=v1?k2=v2#k3=v3"}, + { + "hostname", + "10018", + "catalog", + "db", + "jdbc:hive2://hostname:10018/catalog/db;k1=v1?k2=v2#k3=v3" + } }); } - public UtilsTest(String expectedHost, String expectedPort, String uri) { + public UtilsTest( + String expectedHost, + String expectedPort, + String expectedCatalog, + String expectedDb, + String uri) { this.expectedHost = expectedHost; this.expectedPort = expectedPort; + this.expectedCatalog = expectedCatalog; + this.expectedDb = expectedDb; this.uri = uri; } @@ -58,5 +74,7 @@ public class UtilsTest { JdbcConnectionParams jdbcConnectionParams1 = extractURLComponents(uri, new Properties()); assertEquals(expectedHost, jdbcConnectionParams1.getHost()); assertEquals(Integer.parseInt(expectedPort), jdbcConnectionParams1.getPort()); + assertEquals(expectedCatalog, jdbcConnectionParams1.getCatalogName()); + assertEquals(expectedDb, jdbcConnectionParams1.getDbName()); } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index edbb5c45e..2f96c20f5 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -63,6 +63,7 @@ class KyuubiSessionImpl( // TODO: needs improve the hardcode optimizedConf.foreach { + case ("use:catalog", _) => case ("use:database", _) => case ("kyuubi.engine.pool.size.threshold", _) => case (key, value) => sessionConf.set(key, value)