[KYUUBI #6453] Make KSHC support Spark 4.0 and enable CI for Spark 4.0

# 🔍 Description

This PR makes KSHC support Spark 4.0, and also makes sure that the KSHC jar compiled against Spark 3.5 is binary compatible with Spark 4.0.

We are ready to enable CI for Spark 4.0, except for authZ module.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

Pass GHA.

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6453 from pan3793/spark4-ci.

Closes #6453

695e3d7f7 [Cheng Pan] Update pom.xml
2eaa0f88a [Cheng Pan] Update .github/workflows/master.yml
b1f540a34 [Cheng Pan] cross test
562839982 [Cheng Pan] fix
9f0c2e1be [Cheng Pan] fix
45f182462 [Cheng Pan] kshc
227ef5bae [Cheng Pan] fix
690a3b8b2 [Cheng Pan] Revert "fix"
87fe7678b [Cheng Pan] fix
60f55dbed [Cheng Pan] CI for Spark 4.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2024-06-07 11:01:24 +08:00
parent fe5377e0fa
commit 03dcedd89e
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
6 changed files with 113 additions and 47 deletions

View File

@ -56,6 +56,11 @@ jobs:
exclude-tags: [""]
comment: ["normal"]
include:
- java: 17
spark: '4.0'
spark-archive: '-Pscala-2.13'
exclude-tags: ''
comment: 'normal'
- java: 8
spark: '3.5'
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 -Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
@ -193,6 +198,12 @@ jobs:
- '3.4'
- '3.3'
comment: [ "normal" ]
include:
- java: 17
scala: "2.13"
spark-compile: "3.5"
spark-runtime: "4.0"
comment: "normal"
env:
SPARK_LOCAL_IP: localhost
TEST_MODULES: "extensions/spark/kyuubi-spark-connector-hive,\

View File

@ -121,12 +121,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-authz_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-connector-tpcds_${scala.binary.version}</artifactId>
@ -169,6 +163,11 @@
<artifactId>kyuubi-extension-spark-3-2_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-authz_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
@ -184,6 +183,11 @@
<artifactId>kyuubi-spark-connector-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-authz_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
@ -209,6 +213,26 @@
<artifactId>kyuubi-extension-spark-3-5_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-connector-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-authz_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-4.0</id>
<dependencies>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-connector-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>

View File

@ -83,7 +83,6 @@ object HiveConnectorUtils extends Logging {
}
}
// SPARK-43039
def splitFiles(
sparkSession: SparkSession,
file: AnyRef,
@ -92,7 +91,26 @@ object HiveConnectorUtils extends Logging {
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
if (SPARK_RUNTIME_VERSION >= "3.5") {
if (SPARK_RUNTIME_VERSION >= "4.0") { // SPARK-42821
val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
.build()
DynMethods
.builder("splitFiles")
.impl(
"org.apache.spark.sql.execution.PartitionedFileUtil",
fileStatusWithMetadataClz,
classOf[Boolean],
classOf[Long],
classOf[InternalRow])
.build()
.invoke[Seq[PartitionedFile]](
null,
file,
isSplitable.asInstanceOf[JBoolean],
maxSplitBytes.asInstanceOf[JLong],
partitionValues)
} else if (SPARK_RUNTIME_VERSION >= "3.5") { // SPARK-43039
val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
.build()
@ -384,7 +402,13 @@ object HiveConnectorUtils extends Logging {
new StructType(newFields)
}
def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
// This is a fork of Spark's withSQLConf, and we use a different name to avoid linkage
// issue on cross-version cases.
// For example, SPARK-46227(4.0.0) moves `withSQLConf` from SQLHelper to SQLConfHelper,
// classes that extend SQLConfHelper will prefer to linkage super class's method when
// compiling with Spark 4.0, then linkage error will happen when run the jar with lower
// Spark versions.
def withSparkSQLConf[T](pairs: (String, String)*)(f: => T): T = {
val conf = SQLConf.get
val (keys, values) = pairs.unzip
val currentValues = keys.map { key =>

View File

@ -44,7 +44,7 @@ import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, GLOB
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSQLConf
import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSparkSQLConf
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider, toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
@ -148,7 +148,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
override val defaultNamespace: Array[String] = Array("default")
override def listTables(namespace: Array[String]): Array[Identifier] =
withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) =>
catalog
@ -162,7 +162,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def loadTable(ident: Identifier): Table =
withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
HiveTable(sparkSession, catalog.getTableMetadata(ident.asTableIdentifier), this)
}
@ -171,7 +171,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table =
withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.TransformHelper
val (partitionColumns, maybeBucketSpec) = partitions.toSeq.convertTransforms
val location = Option(properties.get(TableCatalog.PROP_LOCATION))
@ -213,7 +213,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def alterTable(ident: Identifier, changes: TableChange*): Table =
withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
val catalogTable =
try {
catalog.getTableMetadata(ident.asTableIdentifier)
@ -253,7 +253,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def dropTable(ident: Identifier): Boolean =
withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
try {
if (loadTable(ident) != null) {
catalog.dropTable(
@ -271,7 +271,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
if (tableExists(newIdent)) {
throw new TableAlreadyExistsException(newIdent)
}
@ -288,12 +288,12 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def listNamespaces(): Array[Array[String]] =
withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
catalog.listDatabases().map(Array(_)).toArray
}
override def listNamespaces(namespace: Array[String]): Array[Array[String]] =
withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array() =>
listNamespaces()
@ -305,7 +305,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] =
withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) =>
try {
@ -323,7 +323,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
override def createNamespace(
namespace: Array[String],
metadata: util.Map[String, String]): Unit =
withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) if !catalog.databaseExists(db) =>
catalog.createDatabase(
@ -339,7 +339,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit =
withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) =>
// validate that this catalog's reserved properties are not removed
@ -379,7 +379,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
override def dropNamespace(
namespace: Array[String],
cascade: Boolean): Boolean =
withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) if catalog.databaseExists(db) =>
catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)

View File

@ -28,7 +28,9 @@ import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, HiveExternalCatalog, HiveVersion}
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.HiveExternalCatalog
import org.apache.kyuubi.util.SemanticVersion
// scalastyle:off line.size.limit
/**
@ -48,8 +50,6 @@ object HiveWriteHelper extends Logging {
hadoopConf: Configuration,
path: Path): Path = {
import hive._
// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
// a common scratch directory. After the writing is finished, Hive will simply empty the table
// directory and move the staging directory to it.
@ -59,24 +59,15 @@ object HiveWriteHelper extends Logging {
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
allSupportedHiveVersions)
val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
val hiveVersion = SemanticVersion(
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version.fullVersion)
val stagingDir = hadoopConf.get(hiveStagingDir, ".hive-staging")
val scratchDir = hadoopConf.get(hiveScratchDir, "/tmp/hive")
if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
if (hiveVersion < "1.1") {
oldVersionExternalTempPath(path, hadoopConf, scratchDir)
} else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
newVersionExternalTempPath(path, hadoopConf, stagingDir)
} else {
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
newVersionExternalTempPath(path, hadoopConf, stagingDir)
}
}
@ -96,7 +87,7 @@ object HiveWriteHelper extends Logging {
var dirPath = new Path(
extURI.getScheme,
extURI.getAuthority,
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID)
try {
val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
@ -120,19 +111,12 @@ object HiveWriteHelper extends Logging {
stagingDir: String): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
getExtTmpPathRelTo(path, hadoopConf, stagingDir)
new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000
} else {
new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000")
}
}
private def getExtTmpPathRelTo(
path: Path,
hadoopConf: Configuration,
stagingDir: String): Path = {
new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000
}
private def getExternalScratchDir(
extURI: URI,
hadoopConf: Configuration,

23
pom.xml
View File

@ -2036,6 +2036,29 @@
</properties>
</profile>
<profile>
<id>spark-4.0</id>
<modules>
<module>extensions/spark/kyuubi-spark-connector-hive</module>
</modules>
<properties>
<spark.version>4.0.0-preview1</spark.version>
<spark.binary.version>4.0</spark.binary.version>
<antlr4.version>4.13.1</antlr4.version>
<!-- TODO: update once Delta support Spark 4.0 -->
<delta.version>3.2.0</delta.version>
<delta.artifact>delta-spark_${scala.binary.version}</delta.artifact>
<!-- TODO: update once Hudi support Spark 4.0 -->
<hudi.artifact>hudi-spark3.5-bundle_${scala.binary.version}</hudi.artifact>
<!-- TODO: update once Iceberg support Spark 4.0 -->
<iceberg.artifact>iceberg-spark-runtime-3.5_${scala.binary.version}</iceberg.artifact>
<!-- TODO: update once Paimon support Spark 4.0 -->
<paimon.artifact>paimon-spark-3.5</paimon.artifact>
<maven.plugin.scalatest.exclude.tags>org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest</maven.plugin.scalatest.exclude.tags>
<spark.archive.name>spark-${spark.version}-bin-hadoop3.tgz</spark.archive.name>
</properties>
</profile>
<profile>
<id>spark-master</id>
<properties>