[KYUUBI #5384][KSCH] Hive connector supports Spark 3.5

# 🔍 Description
## Issue References 🔗

This pull request fixes #5384

## Describe Your Solution 🔧

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6133 from Kwafoor/kyuubi_6073.

Closes #5384

9234e35ad [Cheng Pan] fix
7766dfda5 [Cheng Pan] nit
e9da162f8 [Cheng Pan] nit
676bfb26e [Cheng Pan] pretty
c241859af [Cheng Pan] pretty
0eedcf82c [wangjunbo] compat with spark 3.3
3d866546c [wangjunbo] format code
a0898f50f [wangjunbo] delete Unused import
9577f7fe8 [wangjunbo] [KYUUBI #5384] kyuubi-spark-connector-hive supports Spark 3.5

Lead-authored-by: Cheng Pan <chengpan@apache.org>
Co-authored-by: wangjunbo <wangjunbo@qiyi.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2024-03-07 17:56:30 +08:00
parent 5bee05e45f
commit 8cc9b98e25
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
8 changed files with 117 additions and 18 deletions

View File

@ -17,23 +17,63 @@
package org.apache.kyuubi.spark.connector.hive
import java.lang.{Boolean => JBoolean, Long => JLong}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes, calculateSingleLocationSize}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION
import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors, DynMethods}
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
object HiveConnectorUtils extends Logging {
// SPARK-43186
def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat = {
if (SPARK_RUNTIME_VERSION >= "3.5") {
DynConstructors.builder()
.impl(classOf[HiveFileFormat], classOf[FileSinkDesc])
.build[HiveFileFormat]()
.newInstance(fileSinkConf)
} else if (SPARK_RUNTIME_VERSION >= "3.3") {
val shimFileSinkDescClz = DynClasses.builder()
.impl("org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc")
.build()
val shimFileSinkDesc = DynConstructors.builder()
.impl(
"org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc",
classOf[String],
classOf[TableDesc],
classOf[Boolean])
.build[AnyRef]()
.newInstance(
fileSinkConf.getDirName.toString,
fileSinkConf.getTableInfo,
fileSinkConf.getCompressed.asInstanceOf[JBoolean])
DynConstructors.builder()
.impl(classOf[HiveFileFormat], shimFileSinkDescClz)
.build[HiveFileFormat]()
.newInstance(shimFileSinkDesc)
} else {
throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
s"is not supported by Kyuubi spark hive connector.")
}
}
// SPARK-41970
def partitionedFilePath(file: PartitionedFile): String = {
if (SPARK_RUNTIME_VERSION >= "3.4") {
invokeAs[String](file, "urlEncodedPath")
@ -45,6 +85,62 @@ object HiveConnectorUtils extends Logging {
}
}
// SPARK-43039
def splitFiles(
sparkSession: SparkSession,
file: AnyRef,
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
if (SPARK_RUNTIME_VERSION >= "3.5") {
val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
.build()
DynMethods
.builder("splitFiles")
.impl(
"org.apache.spark.sql.execution.PartitionedFileUtil",
classOf[SparkSession],
fileStatusWithMetadataClz,
classOf[Boolean],
classOf[Long],
classOf[InternalRow])
.build()
.invoke[Seq[PartitionedFile]](
null,
sparkSession,
file,
isSplitable.asInstanceOf[JBoolean],
maxSplitBytes.asInstanceOf[JLong],
partitionValues)
} else if (SPARK_RUNTIME_VERSION >= "3.3") {
DynMethods
.builder("splitFiles")
.impl(
"org.apache.spark.sql.execution.PartitionedFileUtil",
classOf[SparkSession],
classOf[FileStatus],
classOf[Path],
classOf[Boolean],
classOf[Long],
classOf[InternalRow])
.build()
.invoke[Seq[PartitionedFile]](
null,
sparkSession,
file,
filePath,
isSplitable.asInstanceOf[JBoolean],
maxSplitBytes.asInstanceOf[JLong],
partitionValues)
} else {
throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
s"is not supported by Kyuubi spark hive connector.")
}
}
def calculateTotalSize(
spark: SparkSession,
catalogTable: CatalogTable,

View File

@ -20,6 +20,7 @@ package org.apache.kyuubi.spark.connector.hive.read
import java.net.URI
import scala.collection.mutable
import scala.language.implicitConversions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
@ -156,6 +157,8 @@ class HiveInMemoryFileIndex(
private val partDirToBindHivePart: mutable.Map[PartitionDirectory, CatalogTablePartition] =
mutable.Map()
implicit private def seqToArr(seq: Seq[FileStatus]): Array[FileStatus] = seq.toArray
override def listFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {

View File

@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.HiveClientImpl
@ -106,7 +105,7 @@ case class HiveScan(
}
partition.files.flatMap { file =>
val filePath = file.getPath
val partFiles = PartitionedFileUtil.splitFiles(
val partFiles = HiveConnectorUtils.splitFiles(
sparkSession = sparkSession,
file = file,
filePath = filePath,

View File

@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.spark.internal.Logging
@ -36,11 +36,12 @@ import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, Write
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, WriteJobDescription}
import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.hive.execution.{HiveFileFormat, HiveOptions}
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{FileSinkDesc, HiveClientImpl, StructTypeHelper}
import org.apache.spark.sql.hive.execution.HiveOptions
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{HiveClientImpl, StructTypeHelper}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.getHiveFileFormat
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
case class HiveWrite(
@ -75,7 +76,7 @@ case class HiveWrite(
override def toBatch: BatchWrite = {
val tmpLocation = HiveWriteHelper.getExternalTmpPath(externalCatalog, hadoopConf, tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val fileSinkConf = new FileSinkDesc(tmpLocation, tableDesc, false)
handleCompression(fileSinkConf, hadoopConf)
val committer = FileCommitProtocol.instantiate(
@ -118,7 +119,7 @@ case class HiveWrite(
pathName: String,
customPartitionLocations: Map[TablePartitionSpec, String],
options: Map[String, String]): WriteJobDescription = {
val hiveFileFormat = new HiveFileFormat(fileSinkConf)
val hiveFileFormat = getHiveFileFormat(fileSinkConf)
val dataSchema = StructType(info.schema().fields.take(dataColumns.length))
val outputWriterFactory = hiveFileFormat.prepareWrite(sparkSession, job, options, dataSchema)
val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics

View File

@ -32,7 +32,6 @@ object HiveBridgeHelper {
type HiveMetastoreCatalog = org.apache.spark.sql.hive.HiveMetastoreCatalog
type HiveExternalCatalog = org.apache.spark.sql.hive.HiveExternalCatalog
type NextIterator[U] = org.apache.spark.util.NextIterator[U]
type FileSinkDesc = org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc
type HiveVersion = org.apache.spark.sql.hive.client.HiveVersion
type InsertIntoHiveTable = org.apache.spark.sql.hive.execution.InsertIntoHiveTable
@ -97,7 +96,9 @@ object HiveBridgeHelper {
}
implicit class StructTypeHelper(structType: StructType) {
def toAttributes: Seq[AttributeReference] = structType.toAttributes
def toAttributes: Seq[AttributeReference] = structType.map { field =>
AttributeReference(field.name, field.dataType, field.nullable, field.metadata)()
}
}
def toSQLValue(v: Any, t: DataType): String = Literal.create(v, t) match {

View File

@ -27,7 +27,7 @@ import scala.util.Try
import com.google.common.collect.Maps
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
@ -121,9 +121,9 @@ class HiveCatalogSuite extends KyuubiHiveTest {
val exception = intercept[AnalysisException] {
spark.table("hive.ns1.nonexistent_table")
}
assert(exception.plan.exists { p =>
p.exists(child => child.isInstanceOf[UnresolvedRelation])
})
assert(exception.message.contains("[TABLE_OR_VIEW_NOT_FOUND] " +
"The table or view `hive`.`ns1`.`nonexistent_table` cannot be found.")
|| exception.message.contains("Table or view not found: hive.ns1.nonexistent_table"))
}
}

View File

@ -18,7 +18,6 @@
package org.apache.kyuubi.spark.connector.hive
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
class HiveQuerySuite extends KyuubiHiveTest {
@ -79,10 +78,9 @@ class HiveQuerySuite extends KyuubiHiveTest {
| SELECT * FROM hive.ns1.tb1
|""".stripMargin)
}
assert(e.plan.exists { p =>
p.exists(child => child.isInstanceOf[UnresolvedRelation])
})
assert(e.message.contains(
"[TABLE_OR_VIEW_NOT_FOUND] The table or view `hive`.`ns1`.`tb1` cannot be found.") ||
e.message.contains("Table or view not found: hive.ns1.tb1"))
}
}
}

View File

@ -2274,6 +2274,7 @@
<id>spark-3.5</id>
<modules>
<module>extensions/spark/kyuubi-extension-spark-3-5</module>
<module>extensions/spark/kyuubi-spark-connector-hive</module>
</modules>
<properties>
<delta.artifact>delta-spark</delta.artifact>