diff --git a/dev/kyuubi-codecov/pom.xml b/dev/kyuubi-codecov/pom.xml index ba15ec0f8..3eb922129 100644 --- a/dev/kyuubi-codecov/pom.xml +++ b/dev/kyuubi-codecov/pom.xml @@ -209,5 +209,15 @@ + + spark-3.4 + + + org.apache.kyuubi + kyuubi-spark-connector-hive_${scala.binary.version} + ${project.version} + + + diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala new file mode 100644 index 000000000..1d2d2b319 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.hive + +import org.apache.spark.SPARK_VERSION +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition} +import org.apache.spark.sql.connector.catalog.TableChange +import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType} +import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes, calculateSingleLocationSize} +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} + +import org.apache.kyuubi.spark.connector.common.SparkUtils +import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs + +object HiveConnectorUtils extends Logging { + + def partitionedFilePath(file: PartitionedFile): String = { + if (SparkUtils.isSparkVersionAtLeast("3.4")) { + invokeAs[String](file, "urlEncodedPath") + } else if (SparkUtils.isSparkVersionAtLeast("3.3")) { + invokeAs[String](file, "filePath") + } else { + throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " + + s"is not supported by Kyuubi spark hive connector.") + } + } + + def calculateTotalSize( + spark: SparkSession, + catalogTable: CatalogTable, + hiveTableCatalog: HiveTableCatalog): (BigInt, Seq[CatalogTablePartition]) = { + val sessionState = spark.sessionState + val startTime = System.nanoTime() + val (totalSize, newPartitions) = if (catalogTable.partitionColumnNames.isEmpty) { + ( + calculateSingleLocationSize( + sessionState, + catalogTable.identifier, + catalogTable.storage.locationUri), + Seq()) + } else { + // Calculate table size as a sum of the visible partitions. See SPARK-21079 + val partitions = hiveTableCatalog.listPartitions(catalogTable.identifier) + logInfo(s"Starting to calculate sizes for ${partitions.length} partitions.") + val paths = partitions.map(_.storage.locationUri) + val sizes = calculateMultipleLocationSizes(spark, catalogTable.identifier, paths) + val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) => + val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), None) + newStats.map(_ => p.copy(stats = newStats)) + } + (sizes.sum, newPartitions) + } + logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to calculate" + + s" the total size for table ${catalogTable.identifier}.") + (totalSize, newPartitions) + } + + def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = { + changes.foldLeft(schema) { (schema, change) => + change match { + case add: AddColumn => + add.fieldNames match { + case Array(name) => + val field = StructField(name, add.dataType, nullable = add.isNullable) + val newField = Option(add.comment).map(field.withComment).getOrElse(field) + addField(schema, newField, add.position()) + + case names => + replace( + schema, + names.init, + parent => + parent.dataType match { + case parentType: StructType => + val field = StructField(names.last, add.dataType, nullable = add.isNullable) + val newField = Option(add.comment).map(field.withComment).getOrElse(field) + Some(parent.copy(dataType = addField(parentType, newField, add.position()))) + + case _ => + throw new IllegalArgumentException(s"Not a struct: ${names.init.last}") + }) + } + + case rename: RenameColumn => + replace( + schema, + rename.fieldNames, + field => + Some(StructField(rename.newName, field.dataType, field.nullable, field.metadata))) + + case update: UpdateColumnType => + replace( + schema, + update.fieldNames, + field => Some(field.copy(dataType = update.newDataType))) + + case update: UpdateColumnNullability => + replace( + schema, + update.fieldNames, + field => Some(field.copy(nullable = update.nullable))) + + case update: UpdateColumnComment => + replace( + schema, + update.fieldNames, + field => Some(field.withComment(update.newComment))) + + case update: UpdateColumnPosition => + def updateFieldPos(struct: StructType, name: String): StructType = { + val oldField = struct.fields.find(_.name == name).getOrElse { + throw new IllegalArgumentException("Field not found: " + name) + } + val withFieldRemoved = StructType(struct.fields.filter(_ != oldField)) + addField(withFieldRemoved, oldField, update.position()) + } + + update.fieldNames() match { + case Array(name) => + updateFieldPos(schema, name) + case names => + replace( + schema, + names.init, + parent => + parent.dataType match { + case parentType: StructType => + Some(parent.copy(dataType = updateFieldPos(parentType, names.last))) + case _ => + throw new IllegalArgumentException(s"Not a struct: ${names.init.last}") + }) + } + + case delete: DeleteColumn => + replace(schema, delete.fieldNames, _ => None, delete.ifExists) + + case _ => + // ignore non-schema changes + schema + } + } + } + + private def addField( + schema: StructType, + field: StructField, + position: ColumnPosition): StructType = { + if (position == null) { + schema.add(field) + } else if (position.isInstanceOf[First]) { + StructType(field +: schema.fields) + } else { + val afterCol = position.asInstanceOf[After].column() + val fieldIndex = schema.fields.indexWhere(_.name == afterCol) + if (fieldIndex == -1) { + throw new IllegalArgumentException("AFTER column not found: " + afterCol) + } + val (before, after) = schema.fields.splitAt(fieldIndex + 1) + StructType(before ++ (field +: after)) + } + } + + private def replace( + struct: StructType, + fieldNames: Seq[String], + update: StructField => Option[StructField], + ifExists: Boolean = false): StructType = { + + val posOpt = fieldNames.zipWithIndex.toMap.get(fieldNames.head) + if (posOpt.isEmpty) { + if (ifExists) { + // We couldn't find the column to replace, but with IF EXISTS, we will silence the error + // Currently only DROP COLUMN may pass down the IF EXISTS parameter + return struct + } else { + throw new IllegalArgumentException(s"Cannot find field: ${fieldNames.head}") + } + } + + val pos = posOpt.get + val field = struct.fields(pos) + val replacement: Option[StructField] = (fieldNames.tail, field.dataType) match { + case (Seq(), _) => + update(field) + + case (names, struct: StructType) => + val updatedType: StructType = replace(struct, names, update, ifExists) + Some(StructField(field.name, updatedType, field.nullable, field.metadata)) + + case (Seq("key"), map @ MapType(keyType, _, _)) => + val updated = update(StructField("key", keyType, nullable = false)) + .getOrElse(throw new IllegalArgumentException(s"Cannot delete map key")) + Some(field.copy(dataType = map.copy(keyType = updated.dataType))) + + case (Seq("key", names @ _*), map @ MapType(keyStruct: StructType, _, _)) => + Some(field.copy(dataType = map.copy(keyType = replace(keyStruct, names, update, ifExists)))) + + case (Seq("value"), map @ MapType(_, mapValueType, isNullable)) => + val updated = update(StructField("value", mapValueType, nullable = isNullable)) + .getOrElse(throw new IllegalArgumentException(s"Cannot delete map value")) + Some(field.copy(dataType = map.copy( + valueType = updated.dataType, + valueContainsNull = updated.nullable))) + + case (Seq("value", names @ _*), map @ MapType(_, valueStruct: StructType, _)) => + Some(field.copy(dataType = map.copy(valueType = + replace(valueStruct, names, update, ifExists)))) + + case (Seq("element"), array @ ArrayType(elementType, isNullable)) => + val updated = update(StructField("element", elementType, nullable = isNullable)) + .getOrElse(throw new IllegalArgumentException(s"Cannot delete array element")) + Some(field.copy(dataType = array.copy( + elementType = updated.dataType, + containsNull = updated.nullable))) + + case (Seq("element", names @ _*), array @ ArrayType(elementStruct: StructType, _)) => + Some(field.copy(dataType = array.copy(elementType = + replace(elementStruct, names, update, ifExists)))) + + case (names, dataType) => + if (!ifExists) { + throw new IllegalArgumentException( + s"Cannot find field: ${names.head} in ${dataType.simpleString}") + } + None + } + + val newFields = struct.fields.zipWithIndex.flatMap { + case (_, index) if pos == index => + replacement + case (other, _) => + Some(other) + } + + new StructType(newFields) + } +} diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index 44057be0a..cfc78940b 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -39,8 +39,7 @@ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.hive.HiveUDFExpressionBuilder import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._ -import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION -import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE +import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, GLOBAL_TEMP_DATABASE} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -212,7 +211,7 @@ class HiveTableCatalog(sparkSession: SparkSession) } val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes) - val schema = CatalogV2Util.applySchemaChanges( + val schema = HiveConnectorUtils.applySchemaChanges( catalogTable.schema, changes) val comment = properties.get(TableCatalog.PROP_COMMENT) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala index 8ac90b3fe..13b6d4c20 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala @@ -26,6 +26,8 @@ import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupporte import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.InputFileBlockHolder import org.apache.spark.sql.internal.SQLConf +import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils + // scalastyle:off line.size.limit // copy from https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala // scalastyle:on line.size.limit @@ -98,7 +100,10 @@ class FilePartitionReader[T](readers: Iterator[HivePartitionedFileReader[T]]) logInfo(s"Reading file $reader") // Sets InputFileBlockHolder for the file block's information val file = reader.file - InputFileBlockHolder.set(file.filePath, file.start, file.length) + InputFileBlockHolder.set( + HiveConnectorUtils.partitionedFilePath(file), + file.start, + file.length) reader } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala index 6770f4144..2b8e2ffd8 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala @@ -31,7 +31,7 @@ import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.NextIterator @@ -40,6 +40,8 @@ import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration +import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils + case class HivePartitionReaderFactory( sqlConf: SQLConf, broadcastHiveConf: Broadcast[SerializableConfiguration], @@ -49,7 +51,7 @@ case class HivePartitionReaderFactory( partitionSchema: StructType, partFileToHivePart: Map[PartitionedFile, HivePartition], pushedFilters: Array[Filter] = Array.empty) - extends FilePartitionReaderFactory with Logging { + extends PartitionReaderFactory with Logging { private val charset: String = sqlConf.getConfString("hive.exec.default.charset", "utf-8") @@ -57,10 +59,6 @@ case class HivePartitionReaderFactory( val tableDesc = HiveReader.getTableDec(hiveTable) val nonPartitionReadDataKeys = HiveReader.toAttributes(readDataSchema) - override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { - throw new UnsupportedOperationException("Cannot use buildReader directly.") - } - override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { assert(partition.isInstanceOf[FilePartition]) val filePartition = partition.asInstanceOf[FilePartition] @@ -117,7 +115,7 @@ case class HivePartitionReaderFactory( val jobConf = new JobConf(broadcastHiveConf.value.value) - val filePath = new Path(new URI(file.filePath)) + val filePath = new Path(new URI(HiveConnectorUtils.partitionedFilePath(file))) if (tableDesc != null) { configureJobPropertiesForStorageHandler(tableDesc, jobConf, true) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala index 64fcf23f8..e71e428b7 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration -import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorException +import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils, KyuubiHiveConnectorException} case class HiveScan( sparkSession: SparkSession, @@ -88,7 +88,7 @@ case class HiveScan( } lazy val partitionValueProject = GenerateUnsafeProjection.generate(readPartitionAttributes, partitionAttributes) - val splitFiles = selectedPartitions.flatMap { partition => + val splitFiles: Seq[PartitionedFile] = selectedPartitions.flatMap { partition => val partitionValues = if (readPartitionAttributes != partitionAttributes) { partitionValueProject(partition.values).copy() @@ -115,7 +115,7 @@ case class HiveScan( } if (splitFiles.length == 1) { - val path = new Path(splitFiles(0).filePath) + val path = new Path(HiveConnectorUtils.partitionedFilePath(splitFiles(0))) if (!isSplitable(path) && splitFiles(0).length > sparkSession.sparkContext.getConf.getOption("spark.io.warning.largeFileThreshold") .getOrElse("1024000000").toLong) { diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala index 625d79d0c..d12fc0efc 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala @@ -28,13 +28,12 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} -import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult} import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, toSQLValue, HiveExternalCatalog} import org.apache.spark.sql.types.StringType -import org.apache.kyuubi.spark.connector.hive.{HiveTableCatalog, KyuubiHiveConnectorException} +import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils, HiveTableCatalog, KyuubiHiveConnectorException} import org.apache.kyuubi.spark.connector.hive.write.HiveWriteHelper.getPartitionSpec class HiveBatchWrite( @@ -77,7 +76,8 @@ class HiveBatchWrite( val catalog = hiveTableCatalog.catalog if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { val newTable = catalog.getTableMetadata(table.identifier) - val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable) + val (newSize, _) = + HiveConnectorUtils.calculateTotalSize(sparkSession, newTable, hiveTableCatalog) val newStats = CatalogStatistics(sizeInBytes = newSize) catalog.alterTableStats(table.identifier, Some(newStats)) } else if (table.stats.nonEmpty) { diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala index 305c1450e..95def8656 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala @@ -47,6 +47,7 @@ object HiveBridgeHelper { val HadoopTableReader = org.apache.spark.sql.hive.HadoopTableReader val SparkHadoopUtil = org.apache.spark.deploy.SparkHadoopUtil val Utils = org.apache.spark.util.Utils + val CatalogV2Implicits = org.apache.spark.sql.connector.catalog.CatalogV2Implicits def postExternalCatalogEvent(sc: SparkContext, event: ExternalCatalogEvent): Unit = { sc.listenerBus.post(event) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala index 9088a6cfe..c1575018e 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala @@ -27,9 +27,10 @@ import scala.util.Try import com.google.common.collect.Maps import org.apache.hadoop.fs.Path import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException, UnresolvedRelation} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._ import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -119,7 +120,9 @@ class HiveCatalogSuite extends KyuubiHiveTest { val exception = intercept[AnalysisException] { spark.table("hive.ns1.nonexistent_table") } - assert(exception.message === "Table or view not found: hive.ns1.nonexistent_table") + assert(exception.plan.exists { p => + p.exists(child => child.isInstanceOf[UnresolvedRelation]) + }) } } @@ -131,13 +134,13 @@ class HiveCatalogSuite extends KyuubiHiveTest { assert(catalog.listTables(Array("ns")).isEmpty) - catalog.createTable(ident1, schema, Array.empty, emptyProps) + catalog.createTable(ident1, schema, Array.empty[Transform], emptyProps) assert(catalog.listTables(Array("ns")).toSet == Set(ident1)) assert(catalog.listTables(Array("ns2")).isEmpty) - catalog.createTable(ident3, schema, Array.empty, emptyProps) - catalog.createTable(ident2, schema, Array.empty, emptyProps) + catalog.createTable(ident3, schema, Array.empty[Transform], emptyProps) + catalog.createTable(ident2, schema, Array.empty[Transform], emptyProps) assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2)) assert(catalog.listTables(Array("ns2")).toSet == Set(ident3)) @@ -157,10 +160,11 @@ class HiveCatalogSuite extends KyuubiHiveTest { test("createTable") { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = + catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) - assert(parsed == Seq("db", "test_table")) + assert(parsed == Seq("db", "test_table") || parsed == Seq("spark_catalog", "db", "test_table")) assert(table.schema == schema) assert(filterV2TableProperties(table.properties) == Map()) @@ -174,10 +178,10 @@ class HiveCatalogSuite extends KyuubiHiveTest { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, Array.empty, properties) + val table = catalog.createTable(testIdent, schema, Array.empty[Transform], properties) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) - assert(parsed == Seq("db", "test_table")) + assert(parsed == Seq("db", "test_table") || parsed == Seq("spark_catalog", "db", "test_table")) assert(table.schema == schema) assert(filterV2TableProperties(table.properties).asJava == properties) @@ -188,13 +192,13 @@ class HiveCatalogSuite extends KyuubiHiveTest { test("createTable: table already exists") { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps) val exc = intercept[TableAlreadyExistsException] { - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps) } - assert(exc.message.contains(table.name())) + assert(exc.message.contains(testIdent.name())) assert(exc.message.contains("already exists")) assert(catalog.tableExists(testIdent)) @@ -204,7 +208,7 @@ class HiveCatalogSuite extends KyuubiHiveTest { test("tableExists") { assert(!catalog.tableExists(testIdent)) - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps) assert(catalog.tableExists(testIdent)) @@ -218,32 +222,48 @@ class HiveCatalogSuite extends KyuubiHiveTest { assert(!catalog.tableExists(testIdent)) // default location - val t1 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[HiveTable] + val t1 = catalog.createTable( + testIdent, + schema, + Array.empty[Transform], + properties).asInstanceOf[HiveTable] assert(t1.catalogTable.location === catalog.catalog.defaultTablePath(testIdent.asTableIdentifier)) catalog.dropTable(testIdent) // relative path properties.put(TableCatalog.PROP_LOCATION, "relative/path") - val t2 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[HiveTable] + val t2 = catalog.createTable( + testIdent, + schema, + Array.empty[Transform], + properties).asInstanceOf[HiveTable] assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path")) catalog.dropTable(testIdent) // absolute path without scheme properties.put(TableCatalog.PROP_LOCATION, "/absolute/path") - val t3 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[HiveTable] + val t3 = catalog.createTable( + testIdent, + schema, + Array.empty[Transform], + properties).asInstanceOf[HiveTable] assert(t3.catalogTable.location.toString === "file:/absolute/path") catalog.dropTable(testIdent) // absolute path with scheme properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path") - val t4 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[HiveTable] + val t4 = catalog.createTable( + testIdent, + schema, + Array.empty[Transform], + properties).asInstanceOf[HiveTable] assert(t4.catalogTable.location.toString === "file:/absolute/path") catalog.dropTable(testIdent) } test("loadTable") { - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps) val loaded = catalog.loadTable(testIdent) assert(table.name == loaded.name) @@ -253,15 +273,13 @@ class HiveCatalogSuite extends KyuubiHiveTest { } test("loadTable: table does not exist") { - val exc = intercept[NoSuchTableException] { + intercept[NoSuchTableException] { catalog.loadTable(testIdent) } - - assert(exc.message.contains("Table or view 'test_table' not found in database 'db'")) } test("invalidateTable") { - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps) // Hive v2 don't cache table catalog.invalidateTable(testIdent) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala index 16ea03234..d0b97676b 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.spark.connector.hive import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation class HiveQuerySuite extends KyuubiHiveTest { @@ -70,7 +71,10 @@ class HiveQuerySuite extends KyuubiHiveTest { | SELECT * FROM hive.ns1.tb1 |""".stripMargin) } - assert(e.getMessage().contains("Table or view not found: hive.ns1.tb1")) + + assert(e.plan.exists { p => + p.exists(child => child.isInstanceOf[UnresolvedRelation]) + }) } } } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala index 855eb0c67..e2e5b574b 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala @@ -62,7 +62,8 @@ trait CreateNamespaceSuiteBase extends DDLCommandTestUtils { val e = intercept[IllegalArgumentException] { sql(s"CREATE NAMESPACE $ns LOCATION ''") } - assert(e.getMessage.contains("Can not create a Path from an empty string")) + assert(e.getMessage.contains("Can not create a Path from an empty string") || + e.getMessage.contains("The location name cannot be empty string")) val uri = new Path(path).toUri sql(s"CREATE NAMESPACE $ns LOCATION '$uri'") @@ -83,7 +84,8 @@ trait CreateNamespaceSuiteBase extends DDLCommandTestUtils { val e = intercept[NamespaceAlreadyExistsException] { sql(s"CREATE NAMESPACE $ns") } - assert(e.getMessage.contains(s"Namespace '$namespace' already exists")) + assert(e.getMessage.contains(s"Namespace '$namespace' already exists") || + e.getMessage.contains(s"Cannot create schema `fakens` because it already exists")) // The following will be no-op since the namespace already exists. Try { sql(s"CREATE NAMESPACE IF NOT EXISTS $ns") }.isSuccess diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala index 66eb42c86..81107c24f 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala @@ -60,7 +60,8 @@ trait DropNamespaceSuiteBase extends DDLCommandTestUtils { val message = intercept[AnalysisException] { sql(s"DROP NAMESPACE $catalogName.unknown") }.getMessage - assert(message.contains(s"'unknown' not found")) + assert(message.contains(s"'unknown' not found") || + message.contains(s"The schema `unknown` cannot be found")) } test("drop non-empty namespace with a non-cascading mode") {