[KYUUBI #329] Support tests with Apache Iceberg in engine
 [](https://github.com/yaooqinn/kyuubi/pull/329)     [❨?❩](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT --> <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Support tests with Apache Iceberg in engine - Improve debug experience for developers - Improve test coverage ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request Closes #329 from yaooqinn/iceberg2. 8bf6714 [Kent Yao] Reflect IcebergOperationSuit 329593a [Kent Yao] Reflect IcebergOperationSuit Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
787f53e098
commit
f186fcb15e
7
externals/kyuubi-spark-sql-engine/pom.xml
vendored
7
externals/kyuubi-spark-sql-engine/pom.xml
vendored
@ -63,8 +63,13 @@
|
||||
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.iceberg</groupId>
|
||||
<artifactId>${iceberg.name}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
|
||||
|
||||
@ -66,7 +66,8 @@ class Shim_v3_0 extends Shim_v2_4 {
|
||||
val catalog = manager.currentCatalog
|
||||
(getSchemas(catalog, schemaPattern) ++ viewMgr).map(Row(_, catalog.name()))
|
||||
} else {
|
||||
(getSchemas(manager.catalog(catalogName), schemaPattern) ++ viewMgr).map(Row(_, catalogName))
|
||||
val catalogPlugin = manager.catalog(catalogName)
|
||||
(getSchemas(catalogPlugin, schemaPattern) ++ viewMgr).map(Row(_, catalogName))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,35 +19,29 @@ package org.apache.kyuubi.engine.spark
|
||||
|
||||
import org.apache.hadoop.hive.ql.metadata.Hive
|
||||
import org.apache.hadoop.hive.ql.session.SessionState
|
||||
import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TSessionHandle}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.thrift.protocol.TBinaryProtocol
|
||||
import org.apache.thrift.transport.TSocket
|
||||
|
||||
import org.apache.kyuubi.Utils
|
||||
import org.apache.kyuubi.operation.JDBCTests
|
||||
import org.apache.kyuubi.service.authentication.PlainSASLHelper
|
||||
|
||||
trait WithSparkSQLEngine extends JDBCTests {
|
||||
|
||||
val warehousePath = Utils.createTempDir()
|
||||
val metastorePath = Utils.createTempDir()
|
||||
warehousePath.toFile.delete()
|
||||
metastorePath.toFile.delete()
|
||||
System.setProperty("javax.jdo.option.ConnectionURL",
|
||||
s"jdbc:derby:;databaseName=$metastorePath;create=true")
|
||||
System.setProperty("spark.sql.warehouse.dir", warehousePath.toString)
|
||||
System.setProperty("spark.sql.hive.metastore.sharedPrefixes", "org.apache.hive.jdbc")
|
||||
|
||||
SparkSession.clearActiveSession()
|
||||
SparkSession.clearDefaultSession()
|
||||
protected val spark: SparkSession = SparkSQLEngine.createSpark()
|
||||
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
|
||||
|
||||
trait WithSparkSQLEngine extends KyuubiFunSuite {
|
||||
protected var spark: SparkSession = _
|
||||
protected var engine: SparkSQLEngine = _
|
||||
|
||||
protected var connectionUrl: String = _
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
val warehousePath = Utils.createTempDir()
|
||||
val metastorePath = Utils.createTempDir()
|
||||
warehousePath.toFile.delete()
|
||||
metastorePath.toFile.delete()
|
||||
System.setProperty("javax.jdo.option.ConnectionURL",
|
||||
s"jdbc:derby:;databaseName=$metastorePath;create=true")
|
||||
System.setProperty("spark.sql.warehouse.dir", warehousePath.toString)
|
||||
System.setProperty("spark.sql.hive.metastore.sharedPrefixes", "org.apache.hive.jdbc")
|
||||
|
||||
SparkSession.clearActiveSession()
|
||||
SparkSession.clearDefaultSession()
|
||||
spark = SparkSQLEngine.createSpark()
|
||||
engine = SparkSQLEngine.startEngine(spark)
|
||||
connectionUrl = engine.connectionUrl
|
||||
super.beforeAll()
|
||||
@ -58,48 +52,12 @@ trait WithSparkSQLEngine extends JDBCTests {
|
||||
if (engine != null) {
|
||||
engine.stop()
|
||||
}
|
||||
spark.stop()
|
||||
if (spark != null) {
|
||||
spark.stop()
|
||||
}
|
||||
SessionState.detachSession()
|
||||
Hive.closeCurrent()
|
||||
}
|
||||
|
||||
protected def jdbcUrl: String = s"jdbc:hive2://$connectionUrl/;"
|
||||
|
||||
protected def withThriftClient(f: TCLIService.Iface => Unit): Unit = {
|
||||
val hostAndPort = connectionUrl.split(":")
|
||||
val host = hostAndPort.head
|
||||
val port = hostAndPort(1).toInt
|
||||
val socket = new TSocket(host, port)
|
||||
val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, "anonymous", socket)
|
||||
|
||||
val protocol = new TBinaryProtocol(transport)
|
||||
val client = new TCLIService.Client(protocol)
|
||||
transport.open()
|
||||
try {
|
||||
f(client)
|
||||
} finally {
|
||||
socket.close()
|
||||
}
|
||||
}
|
||||
|
||||
protected def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = {
|
||||
withThriftClient { client =>
|
||||
val req = new TOpenSessionReq()
|
||||
req.setUsername(user)
|
||||
req.setPassword("anonymous")
|
||||
val resp = client.OpenSession(req)
|
||||
val handle = resp.getSessionHandle
|
||||
|
||||
try {
|
||||
f(client, handle)
|
||||
} finally {
|
||||
val tCloseSessionReq = new TCloseSessionReq(handle)
|
||||
try {
|
||||
client.CloseSession(tCloseSessionReq)
|
||||
} catch {
|
||||
case e: Exception => error(s"Failed to close $handle", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
protected def getJdbcUrl: String = s"jdbc:hive2://$connectionUrl/;"
|
||||
}
|
||||
|
||||
@ -0,0 +1,39 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.engine.spark.operation
|
||||
|
||||
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
|
||||
import org.apache.kyuubi.operation.BasicIcebergJDBCTests
|
||||
|
||||
class SparkIcebergOperationSuite extends WithSparkSQLEngine with BasicIcebergJDBCTests {
|
||||
override protected def jdbcUrl: String = getJdbcUrl
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
for ((k, v) <- icebergConfigs) {
|
||||
System.setProperty(k, v)
|
||||
}
|
||||
super.beforeAll()
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
super.afterAll()
|
||||
for ((k, _) <- icebergConfigs) {
|
||||
System.clearProperty(k)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -33,9 +33,12 @@ import org.apache.spark.sql.types._
|
||||
|
||||
import org.apache.kyuubi.Utils
|
||||
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
|
||||
import org.apache.kyuubi.operation.JDBCTests
|
||||
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
|
||||
|
||||
class SparkOperationSuite extends WithSparkSQLEngine {
|
||||
class SparkOperationSuite extends WithSparkSQLEngine with JDBCTests {
|
||||
|
||||
override protected def jdbcUrl: String = getJdbcUrl
|
||||
|
||||
test("get table types") {
|
||||
withJdbcStatement() { statement =>
|
||||
|
||||
@ -0,0 +1,92 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.operation
|
||||
|
||||
import java.nio.file.Path
|
||||
|
||||
import org.apache.kyuubi.Utils
|
||||
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
|
||||
|
||||
trait BasicIcebergJDBCTests extends JDBCTestUtils {
|
||||
|
||||
protected def catalog: String = "hadoop_prod"
|
||||
|
||||
protected val iceberg: String = {
|
||||
System.getProperty("java.class.path")
|
||||
.split(":")
|
||||
.filter(_.contains("iceberg-spark")).head
|
||||
}
|
||||
|
||||
protected val warehouse: Path = Utils.createTempDir()
|
||||
|
||||
protected val icebergConfigs = Map(
|
||||
"spark.sql.defaultCatalog" -> catalog,
|
||||
"spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
|
||||
"spark.sql.catalog.spark_catalog" -> "org.apache.iceberg.spark.SparkSessionCatalog",
|
||||
"spark.sql.catalog.spark_catalog.type" -> "hive",
|
||||
s"spark.sql.catalog.$catalog" -> "org.apache.iceberg.spark.SparkCatalog",
|
||||
s"spark.sql.catalog.$catalog.type" -> "hadoop",
|
||||
s"spark.sql.catalog.$catalog.warehouse" -> warehouse.toString,
|
||||
"spark.jars" -> iceberg)
|
||||
|
||||
test("get catalogs") {
|
||||
withJdbcStatement() { statement =>
|
||||
val metaData = statement.getConnection.getMetaData
|
||||
val catalogs = metaData.getCatalogs
|
||||
catalogs.next()
|
||||
assert(catalogs.getString(TABLE_CAT) === "spark_catalog")
|
||||
catalogs.next()
|
||||
assert(catalogs.getString(TABLE_CAT) === catalog)
|
||||
}
|
||||
}
|
||||
|
||||
test("get schemas") {
|
||||
val dbs = Seq("db1", "db2", "db33", "db44")
|
||||
val dbDflts = Seq("default", "global_temp")
|
||||
|
||||
withDatabases(dbs: _*) { statement =>
|
||||
dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db"))
|
||||
val metaData = statement.getConnection.getMetaData
|
||||
|
||||
val allPattern = Seq("", "*", "%", null, ".*", "_*", "_%", ".%")
|
||||
|
||||
// The session catalog
|
||||
allPattern foreach { pattern =>
|
||||
checkGetSchemas(metaData.getSchemas("spark_catalog", pattern), dbDflts, "spark_catalog")
|
||||
}
|
||||
|
||||
Seq(null, catalog).foreach { cg =>
|
||||
allPattern foreach { pattern =>
|
||||
checkGetSchemas(
|
||||
metaData.getSchemas(cg, pattern), dbs ++ Seq("global_temp"), catalog)
|
||||
}
|
||||
}
|
||||
|
||||
Seq("db%", "db.*") foreach { pattern =>
|
||||
checkGetSchemas(metaData.getSchemas(catalog, pattern), dbs, catalog)
|
||||
}
|
||||
|
||||
Seq("db_", "db.") foreach { pattern =>
|
||||
checkGetSchemas(metaData.getSchemas(catalog, pattern), dbs.take(2), catalog)
|
||||
}
|
||||
|
||||
checkGetSchemas(metaData.getSchemas(catalog, "db1"), Seq("db1"), catalog)
|
||||
checkGetSchemas(metaData.getSchemas(catalog, "db_not_exist"), Seq.empty, catalog)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -20,7 +20,12 @@ package org.apache.kyuubi.operation
|
||||
import java.sql.{DriverManager, ResultSet, Statement}
|
||||
import java.util.Locale
|
||||
|
||||
import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TSessionHandle}
|
||||
import org.apache.thrift.protocol.TBinaryProtocol
|
||||
import org.apache.thrift.transport.TSocket
|
||||
|
||||
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
|
||||
import org.apache.kyuubi.service.authentication.PlainSASLHelper
|
||||
|
||||
trait JDBCTestUtils extends KyuubiFunSuite {
|
||||
|
||||
@ -73,6 +78,44 @@ trait JDBCTestUtils extends KyuubiFunSuite {
|
||||
withMultipleConnectionJdbcStatement(tableNames: _*)(f)
|
||||
}
|
||||
|
||||
protected def withThriftClient(f: TCLIService.Iface => Unit): Unit = {
|
||||
val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head.split(":")
|
||||
val host = hostAndPort.head
|
||||
val port = hostAndPort(1).toInt
|
||||
val socket = new TSocket(host, port)
|
||||
val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, "anonymous", socket)
|
||||
|
||||
val protocol = new TBinaryProtocol(transport)
|
||||
val client = new TCLIService.Client(protocol)
|
||||
transport.open()
|
||||
try {
|
||||
f(client)
|
||||
} finally {
|
||||
socket.close()
|
||||
}
|
||||
}
|
||||
|
||||
protected def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = {
|
||||
withThriftClient { client =>
|
||||
val req = new TOpenSessionReq()
|
||||
req.setUsername(user)
|
||||
req.setPassword("anonymous")
|
||||
val resp = client.OpenSession(req)
|
||||
val handle = resp.getSessionHandle
|
||||
|
||||
try {
|
||||
f(client, handle)
|
||||
} finally {
|
||||
val tCloseSessionReq = new TCloseSessionReq(handle)
|
||||
try {
|
||||
client.CloseSession(tCloseSessionReq)
|
||||
} catch {
|
||||
case e: Exception => error(s"Failed to close $handle", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected def checkGetSchemas(
|
||||
rs: ResultSet, dbNames: Seq[String], catalogName: String = ""): Unit = {
|
||||
val expected = dbNames
|
||||
|
||||
@ -17,88 +17,12 @@
|
||||
|
||||
package org.apache.kyuubi.operation.datalake
|
||||
|
||||
import org.apache.kyuubi.Utils
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.operation.{JDBCTestUtils, WithKyuubiServer}
|
||||
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
|
||||
|
||||
class IcebergOperationSuite extends WithKyuubiServer with JDBCTestUtils {
|
||||
|
||||
protected def catalog: String = "hadoop_prod"
|
||||
|
||||
private val iceberg: String = {
|
||||
System.getProperty("java.class.path")
|
||||
.split(":")
|
||||
.filter(_.contains("iceberg-spark")).head
|
||||
}
|
||||
|
||||
private val warehouse = Utils.createTempDir()
|
||||
|
||||
/**
|
||||
* spark.sql.defaultCatalog=hadoop_prod
|
||||
* spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
|
||||
* spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
|
||||
* spark.sql.catalog.spark_catalog.type=hive
|
||||
* spark.sql.catalog.hadoop_prod=org.apache.iceberg.spark.SparkCatalog
|
||||
* spark.sql.catalog.hadoop_prod.type=hadoop
|
||||
* spark.sql.catalog.hadoop_prod.warehouse=$warehouse
|
||||
*/
|
||||
override def jdbcUrl: String = getJdbcUrl +
|
||||
"#" +
|
||||
s"spark.sql.defaultCatalog=$catalog;" +
|
||||
"spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;" +
|
||||
"spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;" +
|
||||
"spark.sql.catalog.spark_catalog.type=hive;" +
|
||||
s"spark.sql.catalog.$catalog=org.apache.iceberg.spark.SparkCatalog;" +
|
||||
s"spark.sql.catalog.$catalog.type=hadoop;" +
|
||||
s"spark.sql.catalog.$catalog.warehouse=$warehouse;" +
|
||||
s"spark.jars=$iceberg;"
|
||||
import org.apache.kyuubi.operation.{BasicIcebergJDBCTests, WithKyuubiServer}
|
||||
|
||||
class IcebergOperationSuite extends WithKyuubiServer with BasicIcebergJDBCTests {
|
||||
override protected val conf: KyuubiConf = KyuubiConf()
|
||||
|
||||
test("get catalogs") {
|
||||
withJdbcStatement() { statement =>
|
||||
val metaData = statement.getConnection.getMetaData
|
||||
val catalogs = metaData.getCatalogs
|
||||
catalogs.next()
|
||||
assert(catalogs.getString(TABLE_CAT) === "spark_catalog")
|
||||
catalogs.next()
|
||||
assert(catalogs.getString(TABLE_CAT) === catalog)
|
||||
}
|
||||
}
|
||||
|
||||
test("get schemas") {
|
||||
val dbs = Seq("db1", "db2", "db33", "db44")
|
||||
val dbDflts = Seq("default", "global_temp")
|
||||
|
||||
withDatabases(dbs: _*) { statement =>
|
||||
dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db"))
|
||||
val metaData = statement.getConnection.getMetaData
|
||||
|
||||
val allPattern = Seq("", "*", "%", null, ".*", "_*", "_%", ".%")
|
||||
|
||||
// The session catalog
|
||||
allPattern foreach { pattern =>
|
||||
checkGetSchemas(metaData.getSchemas("spark_catalog", pattern), dbDflts, "spark_catalog")
|
||||
}
|
||||
|
||||
Seq(null, catalog).foreach { cg =>
|
||||
allPattern foreach { pattern =>
|
||||
checkGetSchemas(
|
||||
metaData.getSchemas(cg, pattern), dbs ++ Seq("global_temp"), catalog)
|
||||
}
|
||||
}
|
||||
|
||||
Seq("db%", "db.*") foreach { pattern =>
|
||||
checkGetSchemas(metaData.getSchemas(catalog, pattern), dbs, catalog)
|
||||
}
|
||||
|
||||
Seq("db_", "db.") foreach { pattern =>
|
||||
checkGetSchemas(metaData.getSchemas(catalog, pattern), dbs.take(2), catalog)
|
||||
}
|
||||
|
||||
checkGetSchemas(metaData.getSchemas(catalog, "db1"), Seq("db1"), catalog)
|
||||
checkGetSchemas(metaData.getSchemas(catalog, "db_not_exist"), Seq.empty, catalog)
|
||||
}
|
||||
}
|
||||
override def jdbcUrl: String = getJdbcUrl +
|
||||
"#" + icebergConfigs.map {case (k, v) => k + "=" + v}.mkString(";")
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user