From f186fcb15eb22c9db178ebf5f8fb6c44d3c19b98 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sun, 31 Jan 2021 02:18:18 +0800 Subject: [PATCH] [KYUUBI #329] Support tests with Apache Iceberg in engine ![yaooqinn](https://badgen.net/badge/Hello/yaooqinn/green) [![Closes%20#329](https://badgen.net/badge/Preview/Closes%2520%23329/blue)](https://github.com/yaooqinn/kyuubi/pull/329) ![209](https://badgen.net/badge/%2B/209/red) ![144](https://badgen.net/badge/-/144/green) ![2](https://badgen.net/badge/commits/2/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) [❨?❩](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info) ### _Why are the changes needed?_ Support tests with Apache Iceberg in engine - Improve debug experience for developers - Improve test coverage ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request Closes #329 from yaooqinn/iceberg2. 8bf6714 [Kent Yao] Reflect IcebergOperationSuit 329593a [Kent Yao] Reflect IcebergOperationSuit Authored-by: Kent Yao Signed-off-by: Kent Yao --- externals/kyuubi-spark-sql-engine/pom.xml | 7 +- .../kyuubi/engine/spark/shim/Shim_v3_0.scala | 3 +- .../engine/spark/WithSparkSQLEngine.scala | 80 ++++------------ .../SparkIcebergOperationSuite.scala | 39 ++++++++ .../spark/operation/SparkOperationSuite.scala | 5 +- .../operation/BasicIcebergJDBCTests.scala | 92 +++++++++++++++++++ .../kyuubi/operation/JDBCTestUtils.scala | 43 +++++++++ .../datalake/IcebergOperationSuite.scala | 84 +---------------- 8 files changed, 209 insertions(+), 144 deletions(-) create mode 100644 externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala create mode 100644 kyuubi-common/src/test/scala/org/apache/kyuubi/operation/BasicIcebergJDBCTests.scala diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml index e85ca79eb..53f8bfc1e 100644 --- a/externals/kyuubi-spark-sql-engine/pom.xml +++ b/externals/kyuubi-spark-sql-engine/pom.xml @@ -63,8 +63,13 @@ spark-hive-thriftserver_${scala.binary.version} test - + + org.apache.iceberg + ${iceberg.name} + test + + target/scala-${scala.binary.version}/classes diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala index cb9392680..0dbc950af 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala @@ -66,7 +66,8 @@ class Shim_v3_0 extends Shim_v2_4 { val catalog = manager.currentCatalog (getSchemas(catalog, schemaPattern) ++ viewMgr).map(Row(_, catalog.name())) } else { - (getSchemas(manager.catalog(catalogName), schemaPattern) ++ viewMgr).map(Row(_, catalogName)) + val catalogPlugin = manager.catalog(catalogName) + (getSchemas(catalogPlugin, schemaPattern) ++ viewMgr).map(Row(_, catalogName)) } } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala index bf8d4f682..ee9c17296 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala @@ -19,35 +19,29 @@ package org.apache.kyuubi.engine.spark import org.apache.hadoop.hive.ql.metadata.Hive import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TSessionHandle} import org.apache.spark.sql.SparkSession -import org.apache.thrift.protocol.TBinaryProtocol -import org.apache.thrift.transport.TSocket -import org.apache.kyuubi.Utils -import org.apache.kyuubi.operation.JDBCTests -import org.apache.kyuubi.service.authentication.PlainSASLHelper - -trait WithSparkSQLEngine extends JDBCTests { - - val warehousePath = Utils.createTempDir() - val metastorePath = Utils.createTempDir() - warehousePath.toFile.delete() - metastorePath.toFile.delete() - System.setProperty("javax.jdo.option.ConnectionURL", - s"jdbc:derby:;databaseName=$metastorePath;create=true") - System.setProperty("spark.sql.warehouse.dir", warehousePath.toString) - System.setProperty("spark.sql.hive.metastore.sharedPrefixes", "org.apache.hive.jdbc") - - SparkSession.clearActiveSession() - SparkSession.clearDefaultSession() - protected val spark: SparkSession = SparkSQLEngine.createSpark() +import org.apache.kyuubi.{KyuubiFunSuite, Utils} +trait WithSparkSQLEngine extends KyuubiFunSuite { + protected var spark: SparkSession = _ protected var engine: SparkSQLEngine = _ protected var connectionUrl: String = _ override def beforeAll(): Unit = { + val warehousePath = Utils.createTempDir() + val metastorePath = Utils.createTempDir() + warehousePath.toFile.delete() + metastorePath.toFile.delete() + System.setProperty("javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=$metastorePath;create=true") + System.setProperty("spark.sql.warehouse.dir", warehousePath.toString) + System.setProperty("spark.sql.hive.metastore.sharedPrefixes", "org.apache.hive.jdbc") + + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + spark = SparkSQLEngine.createSpark() engine = SparkSQLEngine.startEngine(spark) connectionUrl = engine.connectionUrl super.beforeAll() @@ -58,48 +52,12 @@ trait WithSparkSQLEngine extends JDBCTests { if (engine != null) { engine.stop() } - spark.stop() + if (spark != null) { + spark.stop() + } SessionState.detachSession() Hive.closeCurrent() } - protected def jdbcUrl: String = s"jdbc:hive2://$connectionUrl/;" - - protected def withThriftClient(f: TCLIService.Iface => Unit): Unit = { - val hostAndPort = connectionUrl.split(":") - val host = hostAndPort.head - val port = hostAndPort(1).toInt - val socket = new TSocket(host, port) - val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, "anonymous", socket) - - val protocol = new TBinaryProtocol(transport) - val client = new TCLIService.Client(protocol) - transport.open() - try { - f(client) - } finally { - socket.close() - } - } - - protected def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = { - withThriftClient { client => - val req = new TOpenSessionReq() - req.setUsername(user) - req.setPassword("anonymous") - val resp = client.OpenSession(req) - val handle = resp.getSessionHandle - - try { - f(client, handle) - } finally { - val tCloseSessionReq = new TCloseSessionReq(handle) - try { - client.CloseSession(tCloseSessionReq) - } catch { - case e: Exception => error(s"Failed to close $handle", e) - } - } - } - } + protected def getJdbcUrl: String = s"jdbc:hive2://$connectionUrl/;" } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala new file mode 100644 index 000000000..2c8d21c29 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.operation + +import org.apache.kyuubi.engine.spark.WithSparkSQLEngine +import org.apache.kyuubi.operation.BasicIcebergJDBCTests + +class SparkIcebergOperationSuite extends WithSparkSQLEngine with BasicIcebergJDBCTests { + override protected def jdbcUrl: String = getJdbcUrl + + override def beforeAll(): Unit = { + for ((k, v) <- icebergConfigs) { + System.setProperty(k, v) + } + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + for ((k, _) <- icebergConfigs) { + System.clearProperty(k) + } + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala index 162790e91..b00049ee6 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala @@ -33,9 +33,12 @@ import org.apache.spark.sql.types._ import org.apache.kyuubi.Utils import org.apache.kyuubi.engine.spark.WithSparkSQLEngine +import org.apache.kyuubi.operation.JDBCTests import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ -class SparkOperationSuite extends WithSparkSQLEngine { +class SparkOperationSuite extends WithSparkSQLEngine with JDBCTests { + + override protected def jdbcUrl: String = getJdbcUrl test("get table types") { withJdbcStatement() { statement => diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/BasicIcebergJDBCTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/BasicIcebergJDBCTests.scala new file mode 100644 index 000000000..d525c5406 --- /dev/null +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/BasicIcebergJDBCTests.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import java.nio.file.Path + +import org.apache.kyuubi.Utils +import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT + +trait BasicIcebergJDBCTests extends JDBCTestUtils { + + protected def catalog: String = "hadoop_prod" + + protected val iceberg: String = { + System.getProperty("java.class.path") + .split(":") + .filter(_.contains("iceberg-spark")).head + } + + protected val warehouse: Path = Utils.createTempDir() + + protected val icebergConfigs = Map( + "spark.sql.defaultCatalog" -> catalog, + "spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + "spark.sql.catalog.spark_catalog" -> "org.apache.iceberg.spark.SparkSessionCatalog", + "spark.sql.catalog.spark_catalog.type" -> "hive", + s"spark.sql.catalog.$catalog" -> "org.apache.iceberg.spark.SparkCatalog", + s"spark.sql.catalog.$catalog.type" -> "hadoop", + s"spark.sql.catalog.$catalog.warehouse" -> warehouse.toString, + "spark.jars" -> iceberg) + + test("get catalogs") { + withJdbcStatement() { statement => + val metaData = statement.getConnection.getMetaData + val catalogs = metaData.getCatalogs + catalogs.next() + assert(catalogs.getString(TABLE_CAT) === "spark_catalog") + catalogs.next() + assert(catalogs.getString(TABLE_CAT) === catalog) + } + } + + test("get schemas") { + val dbs = Seq("db1", "db2", "db33", "db44") + val dbDflts = Seq("default", "global_temp") + + withDatabases(dbs: _*) { statement => + dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db")) + val metaData = statement.getConnection.getMetaData + + val allPattern = Seq("", "*", "%", null, ".*", "_*", "_%", ".%") + + // The session catalog + allPattern foreach { pattern => + checkGetSchemas(metaData.getSchemas("spark_catalog", pattern), dbDflts, "spark_catalog") + } + + Seq(null, catalog).foreach { cg => + allPattern foreach { pattern => + checkGetSchemas( + metaData.getSchemas(cg, pattern), dbs ++ Seq("global_temp"), catalog) + } + } + + Seq("db%", "db.*") foreach { pattern => + checkGetSchemas(metaData.getSchemas(catalog, pattern), dbs, catalog) + } + + Seq("db_", "db.") foreach { pattern => + checkGetSchemas(metaData.getSchemas(catalog, pattern), dbs.take(2), catalog) + } + + checkGetSchemas(metaData.getSchemas(catalog, "db1"), Seq("db1"), catalog) + checkGetSchemas(metaData.getSchemas(catalog, "db_not_exist"), Seq.empty, catalog) + } + } +} diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala index 9817ea25b..ef8fac66d 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala @@ -20,7 +20,12 @@ package org.apache.kyuubi.operation import java.sql.{DriverManager, ResultSet, Statement} import java.util.Locale +import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TSessionHandle} +import org.apache.thrift.protocol.TBinaryProtocol +import org.apache.thrift.transport.TSocket + import org.apache.kyuubi.{KyuubiFunSuite, Utils} +import org.apache.kyuubi.service.authentication.PlainSASLHelper trait JDBCTestUtils extends KyuubiFunSuite { @@ -73,6 +78,44 @@ trait JDBCTestUtils extends KyuubiFunSuite { withMultipleConnectionJdbcStatement(tableNames: _*)(f) } + protected def withThriftClient(f: TCLIService.Iface => Unit): Unit = { + val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head.split(":") + val host = hostAndPort.head + val port = hostAndPort(1).toInt + val socket = new TSocket(host, port) + val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, "anonymous", socket) + + val protocol = new TBinaryProtocol(transport) + val client = new TCLIService.Client(protocol) + transport.open() + try { + f(client) + } finally { + socket.close() + } + } + + protected def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = { + withThriftClient { client => + val req = new TOpenSessionReq() + req.setUsername(user) + req.setPassword("anonymous") + val resp = client.OpenSession(req) + val handle = resp.getSessionHandle + + try { + f(client, handle) + } finally { + val tCloseSessionReq = new TCloseSessionReq(handle) + try { + client.CloseSession(tCloseSessionReq) + } catch { + case e: Exception => error(s"Failed to close $handle", e) + } + } + } + } + protected def checkGetSchemas( rs: ResultSet, dbNames: Seq[String], catalogName: String = ""): Unit = { val expected = dbNames diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala index dea4d8204..efb48e3d8 100644 --- a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala +++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala @@ -17,88 +17,12 @@ package org.apache.kyuubi.operation.datalake -import org.apache.kyuubi.Utils import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.operation.{JDBCTestUtils, WithKyuubiServer} -import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT - -class IcebergOperationSuite extends WithKyuubiServer with JDBCTestUtils { - - protected def catalog: String = "hadoop_prod" - - private val iceberg: String = { - System.getProperty("java.class.path") - .split(":") - .filter(_.contains("iceberg-spark")).head - } - - private val warehouse = Utils.createTempDir() - - /** - * spark.sql.defaultCatalog=hadoop_prod - * spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions - * spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog - * spark.sql.catalog.spark_catalog.type=hive - * spark.sql.catalog.hadoop_prod=org.apache.iceberg.spark.SparkCatalog - * spark.sql.catalog.hadoop_prod.type=hadoop - * spark.sql.catalog.hadoop_prod.warehouse=$warehouse - */ - override def jdbcUrl: String = getJdbcUrl + - "#" + - s"spark.sql.defaultCatalog=$catalog;" + - "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;" + - "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;" + - "spark.sql.catalog.spark_catalog.type=hive;" + - s"spark.sql.catalog.$catalog=org.apache.iceberg.spark.SparkCatalog;" + - s"spark.sql.catalog.$catalog.type=hadoop;" + - s"spark.sql.catalog.$catalog.warehouse=$warehouse;" + - s"spark.jars=$iceberg;" +import org.apache.kyuubi.operation.{BasicIcebergJDBCTests, WithKyuubiServer} +class IcebergOperationSuite extends WithKyuubiServer with BasicIcebergJDBCTests { override protected val conf: KyuubiConf = KyuubiConf() - test("get catalogs") { - withJdbcStatement() { statement => - val metaData = statement.getConnection.getMetaData - val catalogs = metaData.getCatalogs - catalogs.next() - assert(catalogs.getString(TABLE_CAT) === "spark_catalog") - catalogs.next() - assert(catalogs.getString(TABLE_CAT) === catalog) - } - } - - test("get schemas") { - val dbs = Seq("db1", "db2", "db33", "db44") - val dbDflts = Seq("default", "global_temp") - - withDatabases(dbs: _*) { statement => - dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db")) - val metaData = statement.getConnection.getMetaData - - val allPattern = Seq("", "*", "%", null, ".*", "_*", "_%", ".%") - - // The session catalog - allPattern foreach { pattern => - checkGetSchemas(metaData.getSchemas("spark_catalog", pattern), dbDflts, "spark_catalog") - } - - Seq(null, catalog).foreach { cg => - allPattern foreach { pattern => - checkGetSchemas( - metaData.getSchemas(cg, pattern), dbs ++ Seq("global_temp"), catalog) - } - } - - Seq("db%", "db.*") foreach { pattern => - checkGetSchemas(metaData.getSchemas(catalog, pattern), dbs, catalog) - } - - Seq("db_", "db.") foreach { pattern => - checkGetSchemas(metaData.getSchemas(catalog, pattern), dbs.take(2), catalog) - } - - checkGetSchemas(metaData.getSchemas(catalog, "db1"), Seq("db1"), catalog) - checkGetSchemas(metaData.getSchemas(catalog, "db_not_exist"), Seq.empty, catalog) - } - } + override def jdbcUrl: String = getJdbcUrl + + "#" + icebergConfigs.map {case (k, v) => k + "=" + v}.mkString(";") }