[KYUUBI #3378][SUBTASK] Improve hive-connector module tests

### _Why are the changes needed?_

Fix https://github.com/apache/incubator-kyuubi/issues/3378

This pr aims to improve hive-connector module tests and make CI perform tests for the hive connector

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3379 from Yikf/hive-connector-test.

Closes #3378

72ad050c [yikf] CI test for hive-connector

Authored-by: yikf <yikaifei1@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
yikf 2022-09-03 04:44:01 +08:00 committed by Cheng Pan
parent 23a8ccd538
commit 3adcebd557
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
5 changed files with 98 additions and 64 deletions

View File

@ -204,6 +204,11 @@
<artifactId>kyuubi-spark-connector-kudu_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-connector-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>

View File

@ -27,7 +27,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kyuubi-spark-connector-hive</artifactId>
<artifactId>kyuubi-spark-connector-hive_2.12</artifactId>
<name>Kyuubi Spark Hive Connector</name>
<description>A Kyuubi hive connector based on Spark V2 DataSource</description>
<packaging>jar</packaging>

View File

@ -17,38 +17,24 @@
package org.apache.kyuubi.spark.connector.hive
import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession
class HiveCatalogSuite extends KyuubiFunSuite {
class HiveCatalogSuite extends KyuubiHiveTest {
test("get catalog name") {
val sparkConf = new SparkConf()
.setMaster("local[*]")
.set("spark.ui.enabled", "false")
.set("spark.sql.catalogImplementation", "hive")
.set("spark.sql.catalog.v2hive", classOf[HiveTableCatalog].getName)
withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
withSparkSession() { spark =>
val catalog = new HiveTableCatalog
val catalogName = "v2hive"
val catalogName = "hive"
catalog.initialize(catalogName, CaseInsensitiveStringMap.empty())
assert(catalog.name() == catalogName)
}
}
test("supports namespaces") {
val sparkConf = new SparkConf()
.setMaster("local[*]")
.set("spark.ui.enabled", "false")
.set("spark.sql.catalogImplementation", "hive")
.set("spark.sql.catalog.v2hive_namespaces", classOf[HiveTableCatalog].getName)
withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
withSparkSession() { spark =>
try {
spark.sql("USE v2hive_namespaces")
spark.sql("USE hive")
spark.sql("CREATE NAMESPACE IF NOT EXISTS ns1")
assert(spark.sql(s"SHOW NAMESPACES").collect().length == 2)
} finally {
@ -58,16 +44,11 @@ class HiveCatalogSuite extends KyuubiFunSuite {
}
test("nonexistent table") {
val sparkConf = new SparkConf()
.setMaster("local[*]")
.set("spark.ui.enabled", "false")
.set("spark.sql.catalogImplementation", "hive")
.set("spark.sql.catalog.v2hive", classOf[HiveTableCatalog].getName)
withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
withSparkSession() { spark =>
val exception = intercept[AnalysisException] {
spark.table("v2hive.ns1.nonexistent_table")
spark.table("hive.ns1.nonexistent_table")
}
assert(exception.message === "Table or view not found: v2hive.ns1.nonexistent_table")
assert(exception.message === "Table or view not found: hive.ns1.nonexistent_table")
}
}
}

View File

@ -17,57 +17,44 @@
package org.apache.kyuubi.spark.connector.hive
import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.AnalysisException
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession
class HiveConnectorSuite extends KyuubiHiveTest {
class HiveConnectorSuite extends KyuubiFunSuite {
def withTempTable(table: String)(f: => Unit): Unit = {
withSparkSession() { spark =>
spark.sql(
s"""
| CREATE TABLE IF NOT EXISTS
| $table (id String, date String)
| USING PARQUET
| PARTITIONED BY (date)
|""".stripMargin).collect()
def withTempTable(spark: SparkSession, table: String)(f: => Unit): Unit = {
spark.sql(
s"""
| CREATE TABLE IF NOT EXISTS
| $table (id String, date String)
| USING PARQUET
| PARTITIONED BY (date)
|""".stripMargin).collect()
try f
finally spark.sql(s"DROP TABLE $table")
try f
finally spark.sql(s"DROP TABLE IF EXISTS $table")
}
}
test("simple query") {
val sparkConf = new SparkConf()
.setMaster("local[*]")
.set("spark.ui.enabled", "false")
.set("spark.sql.catalogImplementation", "hive")
.set("spark.sql.catalog.hivev2", classOf[HiveTableCatalog].getName)
withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
val table = "default.employee"
withTempTable(spark, table) {
spark.sql(
s"""
| INSERT INTO
| $table
| VALUES("yi", "2022-08-08")
|""".stripMargin).collect()
test("A simple query flow.") {
withSparkSession() { spark =>
val table = "hive.default.employee"
withTempTable(table) {
// can query an existing Hive table in three sections
val result = spark.sql(
s"""
| SELECT * FROM hivev2.$table
| SELECT * FROM $table
|""".stripMargin)
assert(result.collect().head == Row("yi", "2022-08-08"))
assert(result.collect().isEmpty)
// error msg should contains catalog info if table is not exist
val e = intercept[AnalysisException] {
spark.sql(
s"""
| SELECT * FROM hivev2.ns1.tb1
| SELECT * FROM hive.ns1.tb1
|""".stripMargin)
}
assert(e.getMessage().contains("Table or view not found: hivev2.ns1.tb1"))
assert(e.getMessage().contains("Table or view not found: hive.ns1.tb1"))
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.spark.connector.hive
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.spark.connector.common.LocalSparkSession
abstract class KyuubiHiveTest extends KyuubiFunSuite with Logging {
private var spark: SparkSession = _
override def beforeEach(): Unit = {
super.beforeAll()
getOrCreateSpark()
}
override def afterEach(): Unit = {
super.afterAll()
LocalSparkSession.stop(spark)
}
def getOrCreateSpark(): Unit = {
val sparkConf = new SparkConf()
.setMaster("local[*]")
.set("spark.ui.enabled", "false")
.set("spark.sql.catalogImplementation", "hive")
.set("spark.sql.catalog.hive", classOf[HiveTableCatalog].getName)
.set("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:memorydb;create=true")
.set("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver")
spark = SparkSession.builder.config(sparkConf).getOrCreate()
}
def withSparkSession[T](conf: Map[String, String] = Map.empty[String, String])(
f: SparkSession => T): T = {
assert(spark != null)
conf.foreach {
case (k, v) => spark.sessionState.conf.setConfString(k, v)
}
f(spark)
}
}