diff --git a/dev/kyuubi-codecov/pom.xml b/dev/kyuubi-codecov/pom.xml index ce4758f12..0da5e5391 100644 --- a/dev/kyuubi-codecov/pom.xml +++ b/dev/kyuubi-codecov/pom.xml @@ -204,6 +204,11 @@ kyuubi-spark-connector-kudu_${scala.binary.version} ${project.version} + + org.apache.kyuubi + kyuubi-spark-connector-hive_${scala.binary.version} + ${project.version} + diff --git a/extensions/spark/kyuubi-spark-connector-hive/pom.xml b/extensions/spark/kyuubi-spark-connector-hive/pom.xml index 9f0912992..85c175f99 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/pom.xml +++ b/extensions/spark/kyuubi-spark-connector-hive/pom.xml @@ -27,7 +27,7 @@ 4.0.0 - kyuubi-spark-connector-hive + kyuubi-spark-connector-hive_2.12 Kyuubi Spark Hive Connector A Kyuubi hive connector based on Spark V2 DataSource jar diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala index f56b7577d..fae0be65e 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala @@ -17,38 +17,24 @@ package org.apache.kyuubi.spark.connector.hive -import org.apache.spark.SparkConf -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.util.CaseInsensitiveStringMap -import org.apache.kyuubi.KyuubiFunSuite -import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession - -class HiveCatalogSuite extends KyuubiFunSuite { +class HiveCatalogSuite extends KyuubiHiveTest { test("get catalog name") { - val sparkConf = new SparkConf() - .setMaster("local[*]") - .set("spark.ui.enabled", "false") - .set("spark.sql.catalogImplementation", "hive") - .set("spark.sql.catalog.v2hive", classOf[HiveTableCatalog].getName) - withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => + withSparkSession() { spark => val catalog = new HiveTableCatalog - val catalogName = "v2hive" + val catalogName = "hive" catalog.initialize(catalogName, CaseInsensitiveStringMap.empty()) assert(catalog.name() == catalogName) } } test("supports namespaces") { - val sparkConf = new SparkConf() - .setMaster("local[*]") - .set("spark.ui.enabled", "false") - .set("spark.sql.catalogImplementation", "hive") - .set("spark.sql.catalog.v2hive_namespaces", classOf[HiveTableCatalog].getName) - withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => + withSparkSession() { spark => try { - spark.sql("USE v2hive_namespaces") + spark.sql("USE hive") spark.sql("CREATE NAMESPACE IF NOT EXISTS ns1") assert(spark.sql(s"SHOW NAMESPACES").collect().length == 2) } finally { @@ -58,16 +44,11 @@ class HiveCatalogSuite extends KyuubiFunSuite { } test("nonexistent table") { - val sparkConf = new SparkConf() - .setMaster("local[*]") - .set("spark.ui.enabled", "false") - .set("spark.sql.catalogImplementation", "hive") - .set("spark.sql.catalog.v2hive", classOf[HiveTableCatalog].getName) - withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => + withSparkSession() { spark => val exception = intercept[AnalysisException] { - spark.table("v2hive.ns1.nonexistent_table") + spark.table("hive.ns1.nonexistent_table") } - assert(exception.message === "Table or view not found: v2hive.ns1.nonexistent_table") + assert(exception.message === "Table or view not found: hive.ns1.nonexistent_table") } } } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorSuite.scala index 090558383..99dff4281 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorSuite.scala @@ -17,57 +17,44 @@ package org.apache.kyuubi.spark.connector.hive -import org.apache.spark.SparkConf -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.AnalysisException -import org.apache.kyuubi.KyuubiFunSuite -import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession +class HiveConnectorSuite extends KyuubiHiveTest { -class HiveConnectorSuite extends KyuubiFunSuite { + def withTempTable(table: String)(f: => Unit): Unit = { + withSparkSession() { spark => + spark.sql( + s""" + | CREATE TABLE IF NOT EXISTS + | $table (id String, date String) + | USING PARQUET + | PARTITIONED BY (date) + |""".stripMargin).collect() - def withTempTable(spark: SparkSession, table: String)(f: => Unit): Unit = { - spark.sql( - s""" - | CREATE TABLE IF NOT EXISTS - | $table (id String, date String) - | USING PARQUET - | PARTITIONED BY (date) - |""".stripMargin).collect() - try f - finally spark.sql(s"DROP TABLE $table") + try f + finally spark.sql(s"DROP TABLE IF EXISTS $table") + } } - test("simple query") { - val sparkConf = new SparkConf() - .setMaster("local[*]") - .set("spark.ui.enabled", "false") - .set("spark.sql.catalogImplementation", "hive") - .set("spark.sql.catalog.hivev2", classOf[HiveTableCatalog].getName) - withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => - val table = "default.employee" - withTempTable(spark, table) { - spark.sql( - s""" - | INSERT INTO - | $table - | VALUES("yi", "2022-08-08") - |""".stripMargin).collect() - + test("A simple query flow.") { + withSparkSession() { spark => + val table = "hive.default.employee" + withTempTable(table) { // can query an existing Hive table in three sections val result = spark.sql( s""" - | SELECT * FROM hivev2.$table + | SELECT * FROM $table |""".stripMargin) - assert(result.collect().head == Row("yi", "2022-08-08")) + assert(result.collect().isEmpty) // error msg should contains catalog info if table is not exist val e = intercept[AnalysisException] { spark.sql( s""" - | SELECT * FROM hivev2.ns1.tb1 + | SELECT * FROM hive.ns1.tb1 |""".stripMargin) } - assert(e.getMessage().contains("Table or view not found: hivev2.ns1.tb1")) + assert(e.getMessage().contains("Table or view not found: hive.ns1.tb1")) } } } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala new file mode 100644 index 000000000..0f5ecc021 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.hive + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.spark.connector.common.LocalSparkSession + +abstract class KyuubiHiveTest extends KyuubiFunSuite with Logging { + + private var spark: SparkSession = _ + + override def beforeEach(): Unit = { + super.beforeAll() + getOrCreateSpark() + } + + override def afterEach(): Unit = { + super.afterAll() + LocalSparkSession.stop(spark) + } + + def getOrCreateSpark(): Unit = { + val sparkConf = new SparkConf() + .setMaster("local[*]") + .set("spark.ui.enabled", "false") + .set("spark.sql.catalogImplementation", "hive") + .set("spark.sql.catalog.hive", classOf[HiveTableCatalog].getName) + .set("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:memorydb;create=true") + .set("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver") + + spark = SparkSession.builder.config(sparkConf).getOrCreate() + } + + def withSparkSession[T](conf: Map[String, String] = Map.empty[String, String])( + f: SparkSession => T): T = { + assert(spark != null) + conf.foreach { + case (k, v) => spark.sessionState.conf.setConfString(k, v) + } + f(spark) + } +}