diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml
index e85ca79eb..53f8bfc1e 100644
--- a/externals/kyuubi-spark-sql-engine/pom.xml
+++ b/externals/kyuubi-spark-sql-engine/pom.xml
@@ -63,8 +63,13 @@
spark-hive-thriftserver_${scala.binary.version}
test
-
+
+ org.apache.iceberg
+ ${iceberg.name}
+ test
+
+
target/scala-${scala.binary.version}/classes
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala
index cb9392680..0dbc950af 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala
@@ -66,7 +66,8 @@ class Shim_v3_0 extends Shim_v2_4 {
val catalog = manager.currentCatalog
(getSchemas(catalog, schemaPattern) ++ viewMgr).map(Row(_, catalog.name()))
} else {
- (getSchemas(manager.catalog(catalogName), schemaPattern) ++ viewMgr).map(Row(_, catalogName))
+ val catalogPlugin = manager.catalog(catalogName)
+ (getSchemas(catalogPlugin, schemaPattern) ++ viewMgr).map(Row(_, catalogName))
}
}
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
index bf8d4f682..ee9c17296 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
@@ -19,35 +19,29 @@ package org.apache.kyuubi.engine.spark
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TSessionHandle}
import org.apache.spark.sql.SparkSession
-import org.apache.thrift.protocol.TBinaryProtocol
-import org.apache.thrift.transport.TSocket
-import org.apache.kyuubi.Utils
-import org.apache.kyuubi.operation.JDBCTests
-import org.apache.kyuubi.service.authentication.PlainSASLHelper
-
-trait WithSparkSQLEngine extends JDBCTests {
-
- val warehousePath = Utils.createTempDir()
- val metastorePath = Utils.createTempDir()
- warehousePath.toFile.delete()
- metastorePath.toFile.delete()
- System.setProperty("javax.jdo.option.ConnectionURL",
- s"jdbc:derby:;databaseName=$metastorePath;create=true")
- System.setProperty("spark.sql.warehouse.dir", warehousePath.toString)
- System.setProperty("spark.sql.hive.metastore.sharedPrefixes", "org.apache.hive.jdbc")
-
- SparkSession.clearActiveSession()
- SparkSession.clearDefaultSession()
- protected val spark: SparkSession = SparkSQLEngine.createSpark()
+import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+trait WithSparkSQLEngine extends KyuubiFunSuite {
+ protected var spark: SparkSession = _
protected var engine: SparkSQLEngine = _
protected var connectionUrl: String = _
override def beforeAll(): Unit = {
+ val warehousePath = Utils.createTempDir()
+ val metastorePath = Utils.createTempDir()
+ warehousePath.toFile.delete()
+ metastorePath.toFile.delete()
+ System.setProperty("javax.jdo.option.ConnectionURL",
+ s"jdbc:derby:;databaseName=$metastorePath;create=true")
+ System.setProperty("spark.sql.warehouse.dir", warehousePath.toString)
+ System.setProperty("spark.sql.hive.metastore.sharedPrefixes", "org.apache.hive.jdbc")
+
+ SparkSession.clearActiveSession()
+ SparkSession.clearDefaultSession()
+ spark = SparkSQLEngine.createSpark()
engine = SparkSQLEngine.startEngine(spark)
connectionUrl = engine.connectionUrl
super.beforeAll()
@@ -58,48 +52,12 @@ trait WithSparkSQLEngine extends JDBCTests {
if (engine != null) {
engine.stop()
}
- spark.stop()
+ if (spark != null) {
+ spark.stop()
+ }
SessionState.detachSession()
Hive.closeCurrent()
}
- protected def jdbcUrl: String = s"jdbc:hive2://$connectionUrl/;"
-
- protected def withThriftClient(f: TCLIService.Iface => Unit): Unit = {
- val hostAndPort = connectionUrl.split(":")
- val host = hostAndPort.head
- val port = hostAndPort(1).toInt
- val socket = new TSocket(host, port)
- val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, "anonymous", socket)
-
- val protocol = new TBinaryProtocol(transport)
- val client = new TCLIService.Client(protocol)
- transport.open()
- try {
- f(client)
- } finally {
- socket.close()
- }
- }
-
- protected def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = {
- withThriftClient { client =>
- val req = new TOpenSessionReq()
- req.setUsername(user)
- req.setPassword("anonymous")
- val resp = client.OpenSession(req)
- val handle = resp.getSessionHandle
-
- try {
- f(client, handle)
- } finally {
- val tCloseSessionReq = new TCloseSessionReq(handle)
- try {
- client.CloseSession(tCloseSessionReq)
- } catch {
- case e: Exception => error(s"Failed to close $handle", e)
- }
- }
- }
- }
+ protected def getJdbcUrl: String = s"jdbc:hive2://$connectionUrl/;"
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala
new file mode 100644
index 000000000..2c8d21c29
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.operation
+
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+import org.apache.kyuubi.operation.BasicIcebergJDBCTests
+
+class SparkIcebergOperationSuite extends WithSparkSQLEngine with BasicIcebergJDBCTests {
+ override protected def jdbcUrl: String = getJdbcUrl
+
+ override def beforeAll(): Unit = {
+ for ((k, v) <- icebergConfigs) {
+ System.setProperty(k, v)
+ }
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ for ((k, _) <- icebergConfigs) {
+ System.clearProperty(k)
+ }
+ }
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
index 162790e91..b00049ee6 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
@@ -33,9 +33,12 @@ import org.apache.spark.sql.types._
import org.apache.kyuubi.Utils
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+import org.apache.kyuubi.operation.JDBCTests
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
-class SparkOperationSuite extends WithSparkSQLEngine {
+class SparkOperationSuite extends WithSparkSQLEngine with JDBCTests {
+
+ override protected def jdbcUrl: String = getJdbcUrl
test("get table types") {
withJdbcStatement() { statement =>
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/BasicIcebergJDBCTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/BasicIcebergJDBCTests.scala
new file mode 100644
index 000000000..d525c5406
--- /dev/null
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/BasicIcebergJDBCTests.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import java.nio.file.Path
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
+
+trait BasicIcebergJDBCTests extends JDBCTestUtils {
+
+ protected def catalog: String = "hadoop_prod"
+
+ protected val iceberg: String = {
+ System.getProperty("java.class.path")
+ .split(":")
+ .filter(_.contains("iceberg-spark")).head
+ }
+
+ protected val warehouse: Path = Utils.createTempDir()
+
+ protected val icebergConfigs = Map(
+ "spark.sql.defaultCatalog" -> catalog,
+ "spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
+ "spark.sql.catalog.spark_catalog" -> "org.apache.iceberg.spark.SparkSessionCatalog",
+ "spark.sql.catalog.spark_catalog.type" -> "hive",
+ s"spark.sql.catalog.$catalog" -> "org.apache.iceberg.spark.SparkCatalog",
+ s"spark.sql.catalog.$catalog.type" -> "hadoop",
+ s"spark.sql.catalog.$catalog.warehouse" -> warehouse.toString,
+ "spark.jars" -> iceberg)
+
+ test("get catalogs") {
+ withJdbcStatement() { statement =>
+ val metaData = statement.getConnection.getMetaData
+ val catalogs = metaData.getCatalogs
+ catalogs.next()
+ assert(catalogs.getString(TABLE_CAT) === "spark_catalog")
+ catalogs.next()
+ assert(catalogs.getString(TABLE_CAT) === catalog)
+ }
+ }
+
+ test("get schemas") {
+ val dbs = Seq("db1", "db2", "db33", "db44")
+ val dbDflts = Seq("default", "global_temp")
+
+ withDatabases(dbs: _*) { statement =>
+ dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db"))
+ val metaData = statement.getConnection.getMetaData
+
+ val allPattern = Seq("", "*", "%", null, ".*", "_*", "_%", ".%")
+
+ // The session catalog
+ allPattern foreach { pattern =>
+ checkGetSchemas(metaData.getSchemas("spark_catalog", pattern), dbDflts, "spark_catalog")
+ }
+
+ Seq(null, catalog).foreach { cg =>
+ allPattern foreach { pattern =>
+ checkGetSchemas(
+ metaData.getSchemas(cg, pattern), dbs ++ Seq("global_temp"), catalog)
+ }
+ }
+
+ Seq("db%", "db.*") foreach { pattern =>
+ checkGetSchemas(metaData.getSchemas(catalog, pattern), dbs, catalog)
+ }
+
+ Seq("db_", "db.") foreach { pattern =>
+ checkGetSchemas(metaData.getSchemas(catalog, pattern), dbs.take(2), catalog)
+ }
+
+ checkGetSchemas(metaData.getSchemas(catalog, "db1"), Seq("db1"), catalog)
+ checkGetSchemas(metaData.getSchemas(catalog, "db_not_exist"), Seq.empty, catalog)
+ }
+ }
+}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala
index 9817ea25b..ef8fac66d 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala
@@ -20,7 +20,12 @@ package org.apache.kyuubi.operation
import java.sql.{DriverManager, ResultSet, Statement}
import java.util.Locale
+import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TSessionHandle}
+import org.apache.thrift.protocol.TBinaryProtocol
+import org.apache.thrift.transport.TSocket
+
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.service.authentication.PlainSASLHelper
trait JDBCTestUtils extends KyuubiFunSuite {
@@ -73,6 +78,44 @@ trait JDBCTestUtils extends KyuubiFunSuite {
withMultipleConnectionJdbcStatement(tableNames: _*)(f)
}
+ protected def withThriftClient(f: TCLIService.Iface => Unit): Unit = {
+ val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head.split(":")
+ val host = hostAndPort.head
+ val port = hostAndPort(1).toInt
+ val socket = new TSocket(host, port)
+ val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, "anonymous", socket)
+
+ val protocol = new TBinaryProtocol(transport)
+ val client = new TCLIService.Client(protocol)
+ transport.open()
+ try {
+ f(client)
+ } finally {
+ socket.close()
+ }
+ }
+
+ protected def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = {
+ withThriftClient { client =>
+ val req = new TOpenSessionReq()
+ req.setUsername(user)
+ req.setPassword("anonymous")
+ val resp = client.OpenSession(req)
+ val handle = resp.getSessionHandle
+
+ try {
+ f(client, handle)
+ } finally {
+ val tCloseSessionReq = new TCloseSessionReq(handle)
+ try {
+ client.CloseSession(tCloseSessionReq)
+ } catch {
+ case e: Exception => error(s"Failed to close $handle", e)
+ }
+ }
+ }
+ }
+
protected def checkGetSchemas(
rs: ResultSet, dbNames: Seq[String], catalogName: String = ""): Unit = {
val expected = dbNames
diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala
index dea4d8204..efb48e3d8 100644
--- a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala
+++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala
@@ -17,88 +17,12 @@
package org.apache.kyuubi.operation.datalake
-import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.operation.{JDBCTestUtils, WithKyuubiServer}
-import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
-
-class IcebergOperationSuite extends WithKyuubiServer with JDBCTestUtils {
-
- protected def catalog: String = "hadoop_prod"
-
- private val iceberg: String = {
- System.getProperty("java.class.path")
- .split(":")
- .filter(_.contains("iceberg-spark")).head
- }
-
- private val warehouse = Utils.createTempDir()
-
- /**
- * spark.sql.defaultCatalog=hadoop_prod
- * spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
- * spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
- * spark.sql.catalog.spark_catalog.type=hive
- * spark.sql.catalog.hadoop_prod=org.apache.iceberg.spark.SparkCatalog
- * spark.sql.catalog.hadoop_prod.type=hadoop
- * spark.sql.catalog.hadoop_prod.warehouse=$warehouse
- */
- override def jdbcUrl: String = getJdbcUrl +
- "#" +
- s"spark.sql.defaultCatalog=$catalog;" +
- "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;" +
- "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;" +
- "spark.sql.catalog.spark_catalog.type=hive;" +
- s"spark.sql.catalog.$catalog=org.apache.iceberg.spark.SparkCatalog;" +
- s"spark.sql.catalog.$catalog.type=hadoop;" +
- s"spark.sql.catalog.$catalog.warehouse=$warehouse;" +
- s"spark.jars=$iceberg;"
+import org.apache.kyuubi.operation.{BasicIcebergJDBCTests, WithKyuubiServer}
+class IcebergOperationSuite extends WithKyuubiServer with BasicIcebergJDBCTests {
override protected val conf: KyuubiConf = KyuubiConf()
- test("get catalogs") {
- withJdbcStatement() { statement =>
- val metaData = statement.getConnection.getMetaData
- val catalogs = metaData.getCatalogs
- catalogs.next()
- assert(catalogs.getString(TABLE_CAT) === "spark_catalog")
- catalogs.next()
- assert(catalogs.getString(TABLE_CAT) === catalog)
- }
- }
-
- test("get schemas") {
- val dbs = Seq("db1", "db2", "db33", "db44")
- val dbDflts = Seq("default", "global_temp")
-
- withDatabases(dbs: _*) { statement =>
- dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db"))
- val metaData = statement.getConnection.getMetaData
-
- val allPattern = Seq("", "*", "%", null, ".*", "_*", "_%", ".%")
-
- // The session catalog
- allPattern foreach { pattern =>
- checkGetSchemas(metaData.getSchemas("spark_catalog", pattern), dbDflts, "spark_catalog")
- }
-
- Seq(null, catalog).foreach { cg =>
- allPattern foreach { pattern =>
- checkGetSchemas(
- metaData.getSchemas(cg, pattern), dbs ++ Seq("global_temp"), catalog)
- }
- }
-
- Seq("db%", "db.*") foreach { pattern =>
- checkGetSchemas(metaData.getSchemas(catalog, pattern), dbs, catalog)
- }
-
- Seq("db_", "db.") foreach { pattern =>
- checkGetSchemas(metaData.getSchemas(catalog, pattern), dbs.take(2), catalog)
- }
-
- checkGetSchemas(metaData.getSchemas(catalog, "db1"), Seq("db1"), catalog)
- checkGetSchemas(metaData.getSchemas(catalog, "db_not_exist"), Seq.empty, catalog)
- }
- }
+ override def jdbcUrl: String = getJdbcUrl +
+ "#" + icebergConfigs.map {case (k, v) => k + "=" + v}.mkString(";")
}