[KYUUBI #4522] use:catalog should execute before than use:database

### _Why are the changes needed?_

close #4522

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4648 from lsm1/fix/kyuubi_4522.

Closes #4522

e06046899 [senmiaoliu] use foreach
bd83d6623 [senmiaoliu] spilt narmalizedConf
4d8445aac [senmiaoliu] avoid sort
eda34d480 [senmiaoliu] use catalog first

Authored-by: senmiaoliu <senmiaoliu@trip.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
senmiaoliu 2023-04-04 10:56:43 +08:00 committed by Cheng Pan
parent d7c1c94f23
commit f0796ec078
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
3 changed files with 58 additions and 36 deletions

View File

@ -57,25 +57,34 @@ class FlinkSessionImpl(
override def open(): Unit = {
executor.openSession(handle.identifier.toString)
normalizedConf.foreach {
case ("use:catalog", catalog) =>
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
try {
tableEnv.useCatalog(catalog)
} catch {
case NonFatal(e) =>
val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
Array("use:catalog", "use:database").contains(k)
}
useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
try {
tableEnv.useCatalog(catalog)
} catch {
case NonFatal(e) =>
throw e
}
}
useCatalogAndDatabaseConf.get("use:database").foreach { database =>
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
try {
tableEnv.useDatabase(database)
} catch {
case NonFatal(e) =>
if (database != "default") {
throw e
}
case ("use:database", database) =>
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
try {
tableEnv.useDatabase(database)
} catch {
case NonFatal(e) =>
if (database != "default") {
throw e
}
}
}
}
}
otherConf.foreach {
case (key, value) => setModifiableConfig(key, value)
}
super.open()

View File

@ -54,22 +54,31 @@ class SparkSessionImpl(
private val sessionEvent = SessionEvent(this)
override def open(): Unit = {
normalizedConf.foreach {
case ("use:catalog", catalog) =>
try {
SparkCatalogShim().setCurrentCatalog(spark, catalog)
} catch {
case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") =>
warn(e.getMessage())
}
case ("use:database", database) =>
try {
SparkCatalogShim().setCurrentDatabase(spark, database)
} catch {
case e
if database == "default" && e.getMessage != null &&
e.getMessage.contains("not found") =>
}
val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
Array("use:catalog", "use:database").contains(k)
}
useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
try {
SparkCatalogShim().setCurrentCatalog(spark, catalog)
} catch {
case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") =>
warn(e.getMessage())
}
}
useCatalogAndDatabaseConf.get("use:database").foreach { database =>
try {
SparkCatalogShim().setCurrentDatabase(spark, database)
} catch {
case e
if database == "default" && e.getMessage != null &&
e.getMessage.contains("not found") =>
}
}
otherConf.foreach {
case (key, value) => setModifiableConfig(key, value)
}
KDFRegistry.registerAll(spark)

View File

@ -57,10 +57,14 @@ class TrinoSessionImpl(
private val sessionEvent = TrinoSessionEvent(this)
override def open(): Unit = {
normalizedConf.foreach {
val (useCatalogAndDatabaseConf, _) = normalizedConf.partition { case (k, _) =>
Array("use:catalog", "use:database").contains(k)
}
useCatalogAndDatabaseConf.foreach {
case ("use:catalog", catalog) => catalogName = catalog
case ("use:database", database) => databaseName = database
case _ => // do nothing
}
val httpClient = new OkHttpClient.Builder().build()