[KYUUBI #4999] [KSHC] Kyuubi-Spark-Hive-Connector support Apache Spark 3.4
### _Why are the changes needed?_ This pr amins to make KSHC support Apache Spark 3.4. - KSHC support Apache Spark 3.4 - Make Apache kyuubi `codecov` module contain the spark-3.4 profile. so that Apache kyubbi CI can cover some modules. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes #4999 from Yikf/kudu-spark3.4. Closes #4999 6a35e54b8 [Cheng Pan] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala 66bb742eb [Cheng Pan] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala 7be517c7f [Cheng Pan] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala ae23133d1 [Cheng Pan] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala dda5e6521 [Cheng Pan] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala e43a25dff [Cheng Pan] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala 54f52f16d [Cheng Pan] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala 0955b544b [Cheng Pan] Update pom.xml 38a1383d9 [yikaifei] codecov module should contain the spark 3.4 profile Lead-authored-by: Cheng Pan <pan3793@gmail.com> Co-authored-by: yikaifei <yikaifei@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
04ee20bd19
commit
1d5ac07dfc
@ -209,5 +209,15 @@
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>spark-3.4</id>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.kyuubi</groupId>
|
||||
<artifactId>kyuubi-spark-connector-hive_${scala.binary.version}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
||||
|
||||
@ -0,0 +1,256 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.spark.connector.hive
|
||||
|
||||
import org.apache.spark.SPARK_VERSION
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
|
||||
import org.apache.spark.sql.connector.catalog.TableChange
|
||||
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
|
||||
import org.apache.spark.sql.execution.command.CommandUtils
|
||||
import org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes, calculateSingleLocationSize}
|
||||
import org.apache.spark.sql.execution.datasources.PartitionedFile
|
||||
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
|
||||
|
||||
import org.apache.kyuubi.spark.connector.common.SparkUtils
|
||||
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
|
||||
|
||||
object HiveConnectorUtils extends Logging {
|
||||
|
||||
def partitionedFilePath(file: PartitionedFile): String = {
|
||||
if (SparkUtils.isSparkVersionAtLeast("3.4")) {
|
||||
invokeAs[String](file, "urlEncodedPath")
|
||||
} else if (SparkUtils.isSparkVersionAtLeast("3.3")) {
|
||||
invokeAs[String](file, "filePath")
|
||||
} else {
|
||||
throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
|
||||
s"is not supported by Kyuubi spark hive connector.")
|
||||
}
|
||||
}
|
||||
|
||||
def calculateTotalSize(
|
||||
spark: SparkSession,
|
||||
catalogTable: CatalogTable,
|
||||
hiveTableCatalog: HiveTableCatalog): (BigInt, Seq[CatalogTablePartition]) = {
|
||||
val sessionState = spark.sessionState
|
||||
val startTime = System.nanoTime()
|
||||
val (totalSize, newPartitions) = if (catalogTable.partitionColumnNames.isEmpty) {
|
||||
(
|
||||
calculateSingleLocationSize(
|
||||
sessionState,
|
||||
catalogTable.identifier,
|
||||
catalogTable.storage.locationUri),
|
||||
Seq())
|
||||
} else {
|
||||
// Calculate table size as a sum of the visible partitions. See SPARK-21079
|
||||
val partitions = hiveTableCatalog.listPartitions(catalogTable.identifier)
|
||||
logInfo(s"Starting to calculate sizes for ${partitions.length} partitions.")
|
||||
val paths = partitions.map(_.storage.locationUri)
|
||||
val sizes = calculateMultipleLocationSizes(spark, catalogTable.identifier, paths)
|
||||
val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
|
||||
val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), None)
|
||||
newStats.map(_ => p.copy(stats = newStats))
|
||||
}
|
||||
(sizes.sum, newPartitions)
|
||||
}
|
||||
logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to calculate" +
|
||||
s" the total size for table ${catalogTable.identifier}.")
|
||||
(totalSize, newPartitions)
|
||||
}
|
||||
|
||||
def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = {
|
||||
changes.foldLeft(schema) { (schema, change) =>
|
||||
change match {
|
||||
case add: AddColumn =>
|
||||
add.fieldNames match {
|
||||
case Array(name) =>
|
||||
val field = StructField(name, add.dataType, nullable = add.isNullable)
|
||||
val newField = Option(add.comment).map(field.withComment).getOrElse(field)
|
||||
addField(schema, newField, add.position())
|
||||
|
||||
case names =>
|
||||
replace(
|
||||
schema,
|
||||
names.init,
|
||||
parent =>
|
||||
parent.dataType match {
|
||||
case parentType: StructType =>
|
||||
val field = StructField(names.last, add.dataType, nullable = add.isNullable)
|
||||
val newField = Option(add.comment).map(field.withComment).getOrElse(field)
|
||||
Some(parent.copy(dataType = addField(parentType, newField, add.position())))
|
||||
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
|
||||
})
|
||||
}
|
||||
|
||||
case rename: RenameColumn =>
|
||||
replace(
|
||||
schema,
|
||||
rename.fieldNames,
|
||||
field =>
|
||||
Some(StructField(rename.newName, field.dataType, field.nullable, field.metadata)))
|
||||
|
||||
case update: UpdateColumnType =>
|
||||
replace(
|
||||
schema,
|
||||
update.fieldNames,
|
||||
field => Some(field.copy(dataType = update.newDataType)))
|
||||
|
||||
case update: UpdateColumnNullability =>
|
||||
replace(
|
||||
schema,
|
||||
update.fieldNames,
|
||||
field => Some(field.copy(nullable = update.nullable)))
|
||||
|
||||
case update: UpdateColumnComment =>
|
||||
replace(
|
||||
schema,
|
||||
update.fieldNames,
|
||||
field => Some(field.withComment(update.newComment)))
|
||||
|
||||
case update: UpdateColumnPosition =>
|
||||
def updateFieldPos(struct: StructType, name: String): StructType = {
|
||||
val oldField = struct.fields.find(_.name == name).getOrElse {
|
||||
throw new IllegalArgumentException("Field not found: " + name)
|
||||
}
|
||||
val withFieldRemoved = StructType(struct.fields.filter(_ != oldField))
|
||||
addField(withFieldRemoved, oldField, update.position())
|
||||
}
|
||||
|
||||
update.fieldNames() match {
|
||||
case Array(name) =>
|
||||
updateFieldPos(schema, name)
|
||||
case names =>
|
||||
replace(
|
||||
schema,
|
||||
names.init,
|
||||
parent =>
|
||||
parent.dataType match {
|
||||
case parentType: StructType =>
|
||||
Some(parent.copy(dataType = updateFieldPos(parentType, names.last)))
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
|
||||
})
|
||||
}
|
||||
|
||||
case delete: DeleteColumn =>
|
||||
replace(schema, delete.fieldNames, _ => None, delete.ifExists)
|
||||
|
||||
case _ =>
|
||||
// ignore non-schema changes
|
||||
schema
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def addField(
|
||||
schema: StructType,
|
||||
field: StructField,
|
||||
position: ColumnPosition): StructType = {
|
||||
if (position == null) {
|
||||
schema.add(field)
|
||||
} else if (position.isInstanceOf[First]) {
|
||||
StructType(field +: schema.fields)
|
||||
} else {
|
||||
val afterCol = position.asInstanceOf[After].column()
|
||||
val fieldIndex = schema.fields.indexWhere(_.name == afterCol)
|
||||
if (fieldIndex == -1) {
|
||||
throw new IllegalArgumentException("AFTER column not found: " + afterCol)
|
||||
}
|
||||
val (before, after) = schema.fields.splitAt(fieldIndex + 1)
|
||||
StructType(before ++ (field +: after))
|
||||
}
|
||||
}
|
||||
|
||||
private def replace(
|
||||
struct: StructType,
|
||||
fieldNames: Seq[String],
|
||||
update: StructField => Option[StructField],
|
||||
ifExists: Boolean = false): StructType = {
|
||||
|
||||
val posOpt = fieldNames.zipWithIndex.toMap.get(fieldNames.head)
|
||||
if (posOpt.isEmpty) {
|
||||
if (ifExists) {
|
||||
// We couldn't find the column to replace, but with IF EXISTS, we will silence the error
|
||||
// Currently only DROP COLUMN may pass down the IF EXISTS parameter
|
||||
return struct
|
||||
} else {
|
||||
throw new IllegalArgumentException(s"Cannot find field: ${fieldNames.head}")
|
||||
}
|
||||
}
|
||||
|
||||
val pos = posOpt.get
|
||||
val field = struct.fields(pos)
|
||||
val replacement: Option[StructField] = (fieldNames.tail, field.dataType) match {
|
||||
case (Seq(), _) =>
|
||||
update(field)
|
||||
|
||||
case (names, struct: StructType) =>
|
||||
val updatedType: StructType = replace(struct, names, update, ifExists)
|
||||
Some(StructField(field.name, updatedType, field.nullable, field.metadata))
|
||||
|
||||
case (Seq("key"), map @ MapType(keyType, _, _)) =>
|
||||
val updated = update(StructField("key", keyType, nullable = false))
|
||||
.getOrElse(throw new IllegalArgumentException(s"Cannot delete map key"))
|
||||
Some(field.copy(dataType = map.copy(keyType = updated.dataType)))
|
||||
|
||||
case (Seq("key", names @ _*), map @ MapType(keyStruct: StructType, _, _)) =>
|
||||
Some(field.copy(dataType = map.copy(keyType = replace(keyStruct, names, update, ifExists))))
|
||||
|
||||
case (Seq("value"), map @ MapType(_, mapValueType, isNullable)) =>
|
||||
val updated = update(StructField("value", mapValueType, nullable = isNullable))
|
||||
.getOrElse(throw new IllegalArgumentException(s"Cannot delete map value"))
|
||||
Some(field.copy(dataType = map.copy(
|
||||
valueType = updated.dataType,
|
||||
valueContainsNull = updated.nullable)))
|
||||
|
||||
case (Seq("value", names @ _*), map @ MapType(_, valueStruct: StructType, _)) =>
|
||||
Some(field.copy(dataType = map.copy(valueType =
|
||||
replace(valueStruct, names, update, ifExists))))
|
||||
|
||||
case (Seq("element"), array @ ArrayType(elementType, isNullable)) =>
|
||||
val updated = update(StructField("element", elementType, nullable = isNullable))
|
||||
.getOrElse(throw new IllegalArgumentException(s"Cannot delete array element"))
|
||||
Some(field.copy(dataType = array.copy(
|
||||
elementType = updated.dataType,
|
||||
containsNull = updated.nullable)))
|
||||
|
||||
case (Seq("element", names @ _*), array @ ArrayType(elementStruct: StructType, _)) =>
|
||||
Some(field.copy(dataType = array.copy(elementType =
|
||||
replace(elementStruct, names, update, ifExists))))
|
||||
|
||||
case (names, dataType) =>
|
||||
if (!ifExists) {
|
||||
throw new IllegalArgumentException(
|
||||
s"Cannot find field: ${names.head} in ${dataType.simpleString}")
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
val newFields = struct.fields.zipWithIndex.flatMap {
|
||||
case (_, index) if pos == index =>
|
||||
replacement
|
||||
case (other, _) =>
|
||||
Some(other)
|
||||
}
|
||||
|
||||
new StructType(newFields)
|
||||
}
|
||||
}
|
||||
@ -39,8 +39,7 @@ import org.apache.spark.sql.connector.expressions.Transform
|
||||
import org.apache.spark.sql.execution.datasources.DataSource
|
||||
import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
|
||||
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
|
||||
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
|
||||
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
|
||||
import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, GLOBAL_TEMP_DATABASE}
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.sql.util.CaseInsensitiveStringMap
|
||||
|
||||
@ -212,7 +211,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
|
||||
}
|
||||
|
||||
val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
|
||||
val schema = CatalogV2Util.applySchemaChanges(
|
||||
val schema = HiveConnectorUtils.applySchemaChanges(
|
||||
catalogTable.schema,
|
||||
changes)
|
||||
val comment = properties.get(TableCatalog.PROP_COMMENT)
|
||||
|
||||
@ -26,6 +26,8 @@ import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupporte
|
||||
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.InputFileBlockHolder
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
||||
import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils
|
||||
|
||||
// scalastyle:off line.size.limit
|
||||
// copy from https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
|
||||
// scalastyle:on line.size.limit
|
||||
@ -98,7 +100,10 @@ class FilePartitionReader[T](readers: Iterator[HivePartitionedFileReader[T]])
|
||||
logInfo(s"Reading file $reader")
|
||||
// Sets InputFileBlockHolder for the file block's information
|
||||
val file = reader.file
|
||||
InputFileBlockHolder.set(file.filePath, file.start, file.length)
|
||||
InputFileBlockHolder.set(
|
||||
HiveConnectorUtils.partitionedFilePath(file),
|
||||
file.start,
|
||||
file.length)
|
||||
reader
|
||||
}
|
||||
|
||||
|
||||
@ -31,7 +31,7 @@ import org.apache.spark.TaskContext
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
|
||||
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
|
||||
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
|
||||
import org.apache.spark.sql.execution.datasources.v2._
|
||||
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.NextIterator
|
||||
@ -40,6 +40,8 @@ import org.apache.spark.sql.sources.Filter
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.util.SerializableConfiguration
|
||||
|
||||
import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils
|
||||
|
||||
case class HivePartitionReaderFactory(
|
||||
sqlConf: SQLConf,
|
||||
broadcastHiveConf: Broadcast[SerializableConfiguration],
|
||||
@ -49,7 +51,7 @@ case class HivePartitionReaderFactory(
|
||||
partitionSchema: StructType,
|
||||
partFileToHivePart: Map[PartitionedFile, HivePartition],
|
||||
pushedFilters: Array[Filter] = Array.empty)
|
||||
extends FilePartitionReaderFactory with Logging {
|
||||
extends PartitionReaderFactory with Logging {
|
||||
|
||||
private val charset: String =
|
||||
sqlConf.getConfString("hive.exec.default.charset", "utf-8")
|
||||
@ -57,10 +59,6 @@ case class HivePartitionReaderFactory(
|
||||
val tableDesc = HiveReader.getTableDec(hiveTable)
|
||||
val nonPartitionReadDataKeys = HiveReader.toAttributes(readDataSchema)
|
||||
|
||||
override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = {
|
||||
throw new UnsupportedOperationException("Cannot use buildReader directly.")
|
||||
}
|
||||
|
||||
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
|
||||
assert(partition.isInstanceOf[FilePartition])
|
||||
val filePartition = partition.asInstanceOf[FilePartition]
|
||||
@ -117,7 +115,7 @@ case class HivePartitionReaderFactory(
|
||||
|
||||
val jobConf = new JobConf(broadcastHiveConf.value.value)
|
||||
|
||||
val filePath = new Path(new URI(file.filePath))
|
||||
val filePath = new Path(new URI(HiveConnectorUtils.partitionedFilePath(file)))
|
||||
|
||||
if (tableDesc != null) {
|
||||
configureJobPropertiesForStorageHandler(tableDesc, jobConf, true)
|
||||
|
||||
@ -37,7 +37,7 @@ import org.apache.spark.sql.sources.Filter
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.util.SerializableConfiguration
|
||||
|
||||
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorException
|
||||
import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils, KyuubiHiveConnectorException}
|
||||
|
||||
case class HiveScan(
|
||||
sparkSession: SparkSession,
|
||||
@ -88,7 +88,7 @@ case class HiveScan(
|
||||
}
|
||||
lazy val partitionValueProject =
|
||||
GenerateUnsafeProjection.generate(readPartitionAttributes, partitionAttributes)
|
||||
val splitFiles = selectedPartitions.flatMap { partition =>
|
||||
val splitFiles: Seq[PartitionedFile] = selectedPartitions.flatMap { partition =>
|
||||
val partitionValues =
|
||||
if (readPartitionAttributes != partitionAttributes) {
|
||||
partitionValueProject(partition.values).copy()
|
||||
@ -115,7 +115,7 @@ case class HiveScan(
|
||||
}
|
||||
|
||||
if (splitFiles.length == 1) {
|
||||
val path = new Path(splitFiles(0).filePath)
|
||||
val path = new Path(HiveConnectorUtils.partitionedFilePath(splitFiles(0)))
|
||||
if (!isSplitable(path) && splitFiles(0).length >
|
||||
sparkSession.sparkContext.getConf.getOption("spark.io.warning.largeFileThreshold")
|
||||
.getOrElse("1024000000").toLong) {
|
||||
|
||||
@ -28,13 +28,12 @@ import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.catalog._
|
||||
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
|
||||
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage}
|
||||
import org.apache.spark.sql.execution.command.CommandUtils
|
||||
import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult}
|
||||
import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
|
||||
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, toSQLValue, HiveExternalCatalog}
|
||||
import org.apache.spark.sql.types.StringType
|
||||
|
||||
import org.apache.kyuubi.spark.connector.hive.{HiveTableCatalog, KyuubiHiveConnectorException}
|
||||
import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils, HiveTableCatalog, KyuubiHiveConnectorException}
|
||||
import org.apache.kyuubi.spark.connector.hive.write.HiveWriteHelper.getPartitionSpec
|
||||
|
||||
class HiveBatchWrite(
|
||||
@ -77,7 +76,8 @@ class HiveBatchWrite(
|
||||
val catalog = hiveTableCatalog.catalog
|
||||
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
|
||||
val newTable = catalog.getTableMetadata(table.identifier)
|
||||
val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
|
||||
val (newSize, _) =
|
||||
HiveConnectorUtils.calculateTotalSize(sparkSession, newTable, hiveTableCatalog)
|
||||
val newStats = CatalogStatistics(sizeInBytes = newSize)
|
||||
catalog.alterTableStats(table.identifier, Some(newStats))
|
||||
} else if (table.stats.nonEmpty) {
|
||||
|
||||
@ -47,6 +47,7 @@ object HiveBridgeHelper {
|
||||
val HadoopTableReader = org.apache.spark.sql.hive.HadoopTableReader
|
||||
val SparkHadoopUtil = org.apache.spark.deploy.SparkHadoopUtil
|
||||
val Utils = org.apache.spark.util.Utils
|
||||
val CatalogV2Implicits = org.apache.spark.sql.connector.catalog.CatalogV2Implicits
|
||||
|
||||
def postExternalCatalogEvent(sc: SparkContext, event: ExternalCatalogEvent): Unit = {
|
||||
sc.listenerBus.post(event)
|
||||
|
||||
@ -27,9 +27,10 @@ import scala.util.Try
|
||||
import com.google.common.collect.Maps
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.spark.sql.AnalysisException
|
||||
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
|
||||
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException, UnresolvedRelation}
|
||||
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
|
||||
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
|
||||
import org.apache.spark.sql.connector.expressions.Transform
|
||||
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
|
||||
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
|
||||
import org.apache.spark.sql.util.CaseInsensitiveStringMap
|
||||
@ -119,7 +120,9 @@ class HiveCatalogSuite extends KyuubiHiveTest {
|
||||
val exception = intercept[AnalysisException] {
|
||||
spark.table("hive.ns1.nonexistent_table")
|
||||
}
|
||||
assert(exception.message === "Table or view not found: hive.ns1.nonexistent_table")
|
||||
assert(exception.plan.exists { p =>
|
||||
p.exists(child => child.isInstanceOf[UnresolvedRelation])
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -131,13 +134,13 @@ class HiveCatalogSuite extends KyuubiHiveTest {
|
||||
|
||||
assert(catalog.listTables(Array("ns")).isEmpty)
|
||||
|
||||
catalog.createTable(ident1, schema, Array.empty, emptyProps)
|
||||
catalog.createTable(ident1, schema, Array.empty[Transform], emptyProps)
|
||||
|
||||
assert(catalog.listTables(Array("ns")).toSet == Set(ident1))
|
||||
assert(catalog.listTables(Array("ns2")).isEmpty)
|
||||
|
||||
catalog.createTable(ident3, schema, Array.empty, emptyProps)
|
||||
catalog.createTable(ident2, schema, Array.empty, emptyProps)
|
||||
catalog.createTable(ident3, schema, Array.empty[Transform], emptyProps)
|
||||
catalog.createTable(ident2, schema, Array.empty[Transform], emptyProps)
|
||||
|
||||
assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2))
|
||||
assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
|
||||
@ -157,10 +160,11 @@ class HiveCatalogSuite extends KyuubiHiveTest {
|
||||
test("createTable") {
|
||||
assert(!catalog.tableExists(testIdent))
|
||||
|
||||
val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
|
||||
val table =
|
||||
catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
|
||||
|
||||
val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
|
||||
assert(parsed == Seq("db", "test_table"))
|
||||
assert(parsed == Seq("db", "test_table") || parsed == Seq("spark_catalog", "db", "test_table"))
|
||||
assert(table.schema == schema)
|
||||
assert(filterV2TableProperties(table.properties) == Map())
|
||||
|
||||
@ -174,10 +178,10 @@ class HiveCatalogSuite extends KyuubiHiveTest {
|
||||
|
||||
assert(!catalog.tableExists(testIdent))
|
||||
|
||||
val table = catalog.createTable(testIdent, schema, Array.empty, properties)
|
||||
val table = catalog.createTable(testIdent, schema, Array.empty[Transform], properties)
|
||||
|
||||
val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
|
||||
assert(parsed == Seq("db", "test_table"))
|
||||
assert(parsed == Seq("db", "test_table") || parsed == Seq("spark_catalog", "db", "test_table"))
|
||||
assert(table.schema == schema)
|
||||
assert(filterV2TableProperties(table.properties).asJava == properties)
|
||||
|
||||
@ -188,13 +192,13 @@ class HiveCatalogSuite extends KyuubiHiveTest {
|
||||
test("createTable: table already exists") {
|
||||
assert(!catalog.tableExists(testIdent))
|
||||
|
||||
val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
|
||||
val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
|
||||
|
||||
val exc = intercept[TableAlreadyExistsException] {
|
||||
catalog.createTable(testIdent, schema, Array.empty, emptyProps)
|
||||
catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
|
||||
}
|
||||
|
||||
assert(exc.message.contains(table.name()))
|
||||
assert(exc.message.contains(testIdent.name()))
|
||||
assert(exc.message.contains("already exists"))
|
||||
|
||||
assert(catalog.tableExists(testIdent))
|
||||
@ -204,7 +208,7 @@ class HiveCatalogSuite extends KyuubiHiveTest {
|
||||
test("tableExists") {
|
||||
assert(!catalog.tableExists(testIdent))
|
||||
|
||||
catalog.createTable(testIdent, schema, Array.empty, emptyProps)
|
||||
catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
|
||||
|
||||
assert(catalog.tableExists(testIdent))
|
||||
|
||||
@ -218,32 +222,48 @@ class HiveCatalogSuite extends KyuubiHiveTest {
|
||||
assert(!catalog.tableExists(testIdent))
|
||||
|
||||
// default location
|
||||
val t1 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[HiveTable]
|
||||
val t1 = catalog.createTable(
|
||||
testIdent,
|
||||
schema,
|
||||
Array.empty[Transform],
|
||||
properties).asInstanceOf[HiveTable]
|
||||
assert(t1.catalogTable.location ===
|
||||
catalog.catalog.defaultTablePath(testIdent.asTableIdentifier))
|
||||
catalog.dropTable(testIdent)
|
||||
|
||||
// relative path
|
||||
properties.put(TableCatalog.PROP_LOCATION, "relative/path")
|
||||
val t2 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[HiveTable]
|
||||
val t2 = catalog.createTable(
|
||||
testIdent,
|
||||
schema,
|
||||
Array.empty[Transform],
|
||||
properties).asInstanceOf[HiveTable]
|
||||
assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path"))
|
||||
catalog.dropTable(testIdent)
|
||||
|
||||
// absolute path without scheme
|
||||
properties.put(TableCatalog.PROP_LOCATION, "/absolute/path")
|
||||
val t3 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[HiveTable]
|
||||
val t3 = catalog.createTable(
|
||||
testIdent,
|
||||
schema,
|
||||
Array.empty[Transform],
|
||||
properties).asInstanceOf[HiveTable]
|
||||
assert(t3.catalogTable.location.toString === "file:/absolute/path")
|
||||
catalog.dropTable(testIdent)
|
||||
|
||||
// absolute path with scheme
|
||||
properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path")
|
||||
val t4 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[HiveTable]
|
||||
val t4 = catalog.createTable(
|
||||
testIdent,
|
||||
schema,
|
||||
Array.empty[Transform],
|
||||
properties).asInstanceOf[HiveTable]
|
||||
assert(t4.catalogTable.location.toString === "file:/absolute/path")
|
||||
catalog.dropTable(testIdent)
|
||||
}
|
||||
|
||||
test("loadTable") {
|
||||
val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
|
||||
val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
|
||||
val loaded = catalog.loadTable(testIdent)
|
||||
|
||||
assert(table.name == loaded.name)
|
||||
@ -253,15 +273,13 @@ class HiveCatalogSuite extends KyuubiHiveTest {
|
||||
}
|
||||
|
||||
test("loadTable: table does not exist") {
|
||||
val exc = intercept[NoSuchTableException] {
|
||||
intercept[NoSuchTableException] {
|
||||
catalog.loadTable(testIdent)
|
||||
}
|
||||
|
||||
assert(exc.message.contains("Table or view 'test_table' not found in database 'db'"))
|
||||
}
|
||||
|
||||
test("invalidateTable") {
|
||||
val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
|
||||
val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
|
||||
// Hive v2 don't cache table
|
||||
catalog.invalidateTable(testIdent)
|
||||
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.kyuubi.spark.connector.hive
|
||||
|
||||
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||
|
||||
class HiveQuerySuite extends KyuubiHiveTest {
|
||||
|
||||
@ -70,7 +71,10 @@ class HiveQuerySuite extends KyuubiHiveTest {
|
||||
| SELECT * FROM hive.ns1.tb1
|
||||
|""".stripMargin)
|
||||
}
|
||||
assert(e.getMessage().contains("Table or view not found: hive.ns1.tb1"))
|
||||
|
||||
assert(e.plan.exists { p =>
|
||||
p.exists(child => child.isInstanceOf[UnresolvedRelation])
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -62,7 +62,8 @@ trait CreateNamespaceSuiteBase extends DDLCommandTestUtils {
|
||||
val e = intercept[IllegalArgumentException] {
|
||||
sql(s"CREATE NAMESPACE $ns LOCATION ''")
|
||||
}
|
||||
assert(e.getMessage.contains("Can not create a Path from an empty string"))
|
||||
assert(e.getMessage.contains("Can not create a Path from an empty string") ||
|
||||
e.getMessage.contains("The location name cannot be empty string"))
|
||||
|
||||
val uri = new Path(path).toUri
|
||||
sql(s"CREATE NAMESPACE $ns LOCATION '$uri'")
|
||||
@ -83,7 +84,8 @@ trait CreateNamespaceSuiteBase extends DDLCommandTestUtils {
|
||||
val e = intercept[NamespaceAlreadyExistsException] {
|
||||
sql(s"CREATE NAMESPACE $ns")
|
||||
}
|
||||
assert(e.getMessage.contains(s"Namespace '$namespace' already exists"))
|
||||
assert(e.getMessage.contains(s"Namespace '$namespace' already exists") ||
|
||||
e.getMessage.contains(s"Cannot create schema `fakens` because it already exists"))
|
||||
|
||||
// The following will be no-op since the namespace already exists.
|
||||
Try { sql(s"CREATE NAMESPACE IF NOT EXISTS $ns") }.isSuccess
|
||||
|
||||
@ -60,7 +60,8 @@ trait DropNamespaceSuiteBase extends DDLCommandTestUtils {
|
||||
val message = intercept[AnalysisException] {
|
||||
sql(s"DROP NAMESPACE $catalogName.unknown")
|
||||
}.getMessage
|
||||
assert(message.contains(s"'unknown' not found"))
|
||||
assert(message.contains(s"'unknown' not found") ||
|
||||
message.contains(s"The schema `unknown` cannot be found"))
|
||||
}
|
||||
|
||||
test("drop non-empty namespace with a non-cascading mode") {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user