[KYUUBI #6939] Bump Spark 3.5.5

### Why are the changes needed?

Test Spark 3.5.5 Release Notes

https://spark.apache.org/releases/spark-release-3-5-5.html

### How was this patch tested?

Pass GHA.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #6939 from pan3793/spark-3.5.5.

Closes #6939

8c0288ae5 [Cheng Pan] ga
78b0e72db [Cheng Pan] nit
686a7b0a9 [Cheng Pan] fix
d40cc5bba [Cheng Pan] Bump Spark 3.5.5

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2025-03-03 13:42:09 +08:00
parent bfcf2e708f
commit d5b01fa3e2
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
8 changed files with 75 additions and 75 deletions

View File

@ -444,8 +444,8 @@ jobs:
# https://minikube.sigs.k8s.io/docs/handbook/pushing/#7-loading-directly-to-in-cluster-container-runtime # https://minikube.sigs.k8s.io/docs/handbook/pushing/#7-loading-directly-to-in-cluster-container-runtime
minikube image load apache/kyuubi:latest minikube image load apache/kyuubi:latest
# pre-install spark into minikube # pre-install spark into minikube
docker pull apache/spark:3.5.4 docker pull apache/spark:3.5.5
minikube image load apache/spark:3.5.4 minikube image load apache/spark:3.5.5
- name: kubectl pre-check - name: kubectl pre-check
run: | run: |
kubectl get nodes kubectl get nodes

View File

@ -181,8 +181,8 @@ Examples:
$0 -r docker.io/myrepo -t v1.8.1 build $0 -r docker.io/myrepo -t v1.8.1 build
$0 -r docker.io/myrepo -t v1.8.1 push $0 -r docker.io/myrepo -t v1.8.1 push
- Build and push with tag "v1.8.1" and Spark-3.5.4 as base image to docker.io/myrepo - Build and push with tag "v1.8.1" and Spark-3.5.5 as base image to docker.io/myrepo
$0 -r docker.io/myrepo -t v1.8.1 -b BASE_IMAGE=repo/spark:3.5.4 build $0 -r docker.io/myrepo -t v1.8.1 -b BASE_IMAGE=repo/spark:3.5.5 build
$0 -r docker.io/myrepo -t v1.8.1 push $0 -r docker.io/myrepo -t v1.8.1 push
- Build and push for multiple archs to docker.io/myrepo - Build and push for multiple archs to docker.io/myrepo

View File

@ -42,8 +42,8 @@ Examples:
$0 -r docker.io/myrepo -t v1.8.1 build $0 -r docker.io/myrepo -t v1.8.1 build
$0 -r docker.io/myrepo -t v1.8.1 push $0 -r docker.io/myrepo -t v1.8.1 push
- Build and push with tag "v1.8.1" and Spark-3.5.4 as base image to docker.io/myrepo - Build and push with tag "v1.8.1" and Spark-3.5.5 as base image to docker.io/myrepo
$0 -r docker.io/myrepo -t v1.8.1 -b BASE_IMAGE=repo/spark:3.5.4 build $0 -r docker.io/myrepo -t v1.8.1 -b BASE_IMAGE=repo/spark:3.5.5 build
$0 -r docker.io/myrepo -t v1.8.1 push $0 -r docker.io/myrepo -t v1.8.1 push
- Build and push for multiple archs to docker.io/myrepo - Build and push for multiple archs to docker.io/myrepo

View File

@ -117,7 +117,7 @@ Sometimes, it may be incompatible with other Spark distributions, then you may n
For example, For example,
```shell ```shell
build/mvn clean package -pl :kyuubi-spark-lineage_2.12 -am -DskipTests -Dspark.version=3.5.4 build/mvn clean package -pl :kyuubi-spark-lineage_2.12 -am -DskipTests -Dspark.version=3.5.5
``` ```
The available `spark.version`s are shown in the following table. The available `spark.version`s are shown in the following table.

View File

@ -19,15 +19,16 @@ package org.apache.kyuubi.spark.connector.hive
import java.lang.{Boolean => JBoolean, Long => JLong} import java.lang.{Boolean => JBoolean, Long => JLong}
import scala.util.Try
import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType} import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes, calculateSingleLocationSize} import org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes, calculateSingleLocationSize}
import org.apache.spark.sql.execution.datasources.{PartitionDirectory, PartitionedFile} import org.apache.spark.sql.execution.datasources.{PartitionDirectory, PartitionedFile}
@ -35,20 +36,18 @@ import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION
import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors, DynMethods} import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors, DynMethods}
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
object HiveConnectorUtils extends Logging { object HiveConnectorUtils extends Logging {
// SPARK-43186 def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat =
def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat = { Try { // SPARK-43186: 3.5.0
if (SPARK_RUNTIME_VERSION >= "3.5") {
DynConstructors.builder() DynConstructors.builder()
.impl(classOf[HiveFileFormat], classOf[FileSinkDesc]) .impl(classOf[HiveFileFormat], classOf[FileSinkDesc])
.build[HiveFileFormat]() .build[HiveFileFormat]()
.newInstance(fileSinkConf) .newInstance(fileSinkConf)
} else if (SPARK_RUNTIME_VERSION >= "3.3") { }.recover { case _: Exception =>
val shimFileSinkDescClz = DynClasses.builder() val shimFileSinkDescClz = DynClasses.builder()
.impl("org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc") .impl("org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc")
.build() .build()
@ -67,34 +66,26 @@ object HiveConnectorUtils extends Logging {
.impl(classOf[HiveFileFormat], shimFileSinkDescClz) .impl(classOf[HiveFileFormat], shimFileSinkDescClz)
.build[HiveFileFormat]() .build[HiveFileFormat]()
.newInstance(shimFileSinkDesc) .newInstance(shimFileSinkDesc)
} else { }.get
throw unsupportedSparkVersion()
}
}
// SPARK-41970 def partitionedFilePath(file: PartitionedFile): String =
def partitionedFilePath(file: PartitionedFile): String = { Try { // SPARK-41970: 3.4.0
if (SPARK_RUNTIME_VERSION >= "3.4") {
invokeAs[String](file, "urlEncodedPath") invokeAs[String](file, "urlEncodedPath")
} else if (SPARK_RUNTIME_VERSION >= "3.3") { }.recover { case _: Exception =>
invokeAs[String](file, "filePath") invokeAs[String](file, "filePath")
} else { }.get
throw unsupportedSparkVersion()
}
}
def splitFiles( def splitFiles(
sparkSession: SparkSession, sparkSession: SparkSession,
file: AnyRef, file: AnyRef,
filePath: Path, filePath: Path,
isSplitable: Boolean, isSplitable: JBoolean,
maxSplitBytes: Long, maxSplitBytes: JLong,
partitionValues: InternalRow): Seq[PartitionedFile] = { partitionValues: InternalRow): Seq[PartitionedFile] =
Try { // SPARK-42821: 4.0.0-preview2
if (SPARK_RUNTIME_VERSION >= "4.0") { // SPARK-42821
val fileStatusWithMetadataClz = DynClasses.builder() val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata") .impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
.build() .buildChecked()
DynMethods DynMethods
.builder("splitFiles") .builder("splitFiles")
.impl( .impl(
@ -103,17 +94,40 @@ object HiveConnectorUtils extends Logging {
classOf[Boolean], classOf[Boolean],
classOf[Long], classOf[Long],
classOf[InternalRow]) classOf[InternalRow])
.build() .buildChecked()
.invoke[Seq[PartitionedFile]]( .invokeChecked[Seq[PartitionedFile]](
null, null,
file, file,
isSplitable.asInstanceOf[JBoolean], isSplitable,
maxSplitBytes.asInstanceOf[JLong], maxSplitBytes,
partitionValues) partitionValues)
} else if (SPARK_RUNTIME_VERSION >= "3.5") { // SPARK-43039 }.recover { case _: Exception => // SPARK-51185: Spark 3.5.5
val fileStatusWithMetadataClz = DynClasses.builder() val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata") .impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
.build() .buildChecked()
DynMethods
.builder("splitFiles")
.impl(
"org.apache.spark.sql.execution.PartitionedFileUtil",
classOf[SparkSession],
fileStatusWithMetadataClz,
classOf[Path],
classOf[Boolean],
classOf[Long],
classOf[InternalRow])
.buildChecked()
.invokeChecked[Seq[PartitionedFile]](
null,
sparkSession,
file,
filePath,
isSplitable,
maxSplitBytes,
partitionValues)
}.recover { case _: Exception => // SPARK-43039: 3.5.0
val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
.buildChecked()
DynMethods DynMethods
.builder("splitFiles") .builder("splitFiles")
.impl( .impl(
@ -123,15 +137,15 @@ object HiveConnectorUtils extends Logging {
classOf[Boolean], classOf[Boolean],
classOf[Long], classOf[Long],
classOf[InternalRow]) classOf[InternalRow])
.build() .buildChecked()
.invoke[Seq[PartitionedFile]]( .invokeChecked[Seq[PartitionedFile]](
null, null,
sparkSession, sparkSession,
file, file,
isSplitable.asInstanceOf[JBoolean], isSplitable,
maxSplitBytes.asInstanceOf[JLong], maxSplitBytes,
partitionValues) partitionValues)
} else if (SPARK_RUNTIME_VERSION >= "3.3") { }.recover { case _: Exception =>
DynMethods DynMethods
.builder("splitFiles") .builder("splitFiles")
.impl( .impl(
@ -142,55 +156,41 @@ object HiveConnectorUtils extends Logging {
classOf[Boolean], classOf[Boolean],
classOf[Long], classOf[Long],
classOf[InternalRow]) classOf[InternalRow])
.build() .buildChecked()
.invoke[Seq[PartitionedFile]]( .invokeChecked[Seq[PartitionedFile]](
null, null,
sparkSession, sparkSession,
file, file,
filePath, filePath,
isSplitable.asInstanceOf[JBoolean], isSplitable,
maxSplitBytes.asInstanceOf[JLong], maxSplitBytes,
partitionValues) partitionValues)
} else { }.get
throw unsupportedSparkVersion()
}
}
def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]): PartitionDirectory = { def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]): PartitionDirectory =
if (SPARK_RUNTIME_VERSION >= "3.5") { Try { // SPARK-43039: 3.5.0
new DynMethods.Builder("apply") new DynMethods.Builder("apply")
.impl(classOf[PartitionDirectory], classOf[InternalRow], classOf[Array[FileStatus]]) .impl(classOf[PartitionDirectory], classOf[InternalRow], classOf[Array[FileStatus]])
.buildChecked() .buildChecked()
.asStatic() .asStatic()
.invoke[PartitionDirectory](values, files.toArray) .invoke[PartitionDirectory](values, files.toArray)
} else if (SPARK_RUNTIME_VERSION >= "3.3") { }.recover { case _: Exception =>
new DynMethods.Builder("apply") new DynMethods.Builder("apply")
.impl(classOf[PartitionDirectory], classOf[InternalRow], classOf[Seq[FileStatus]]) .impl(classOf[PartitionDirectory], classOf[InternalRow], classOf[Seq[FileStatus]])
.buildChecked() .buildChecked()
.asStatic() .asStatic()
.invoke[PartitionDirectory](values, files) .invoke[PartitionDirectory](values, files)
} else { }.get
throw unsupportedSparkVersion()
}
}
def getPartitionFilePath(file: AnyRef): Path = { def getPartitionFilePath(file: AnyRef): Path =
if (SPARK_RUNTIME_VERSION >= "3.5") { Try { // SPARK-43039: 3.5.0
new DynMethods.Builder("getPath") new DynMethods.Builder("getPath")
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata") .impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
.build() .build()
.invoke[Path](file) .invoke[Path](file)
} else if (SPARK_RUNTIME_VERSION >= "3.3") { }.recover { case _: Exception =>
file.asInstanceOf[FileStatus].getPath file.asInstanceOf[FileStatus].getPath
} else { }.get
throw unsupportedSparkVersion()
}
}
private def unsupportedSparkVersion(): KyuubiHiveConnectorException = {
KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
"is not supported by Kyuubi spark hive connector.")
}
def calculateTotalSize( def calculateTotalSize(
spark: SparkSession, spark: SparkSession,

View File

@ -56,7 +56,7 @@ class KyuubiOnKubernetesWithSparkTestsBase extends WithKyuubiServerOnKubernetes
Map( Map(
"spark.master" -> s"k8s://$miniKubeApiMaster", "spark.master" -> s"k8s://$miniKubeApiMaster",
// We should update spark docker image in ./github/workflows/master.yml at the same time // We should update spark docker image in ./github/workflows/master.yml at the same time
"spark.kubernetes.container.image" -> "apache/spark:3.5.4", "spark.kubernetes.container.image" -> "apache/spark:3.5.5",
"spark.kubernetes.container.image.pullPolicy" -> "IfNotPresent", "spark.kubernetes.container.image.pullPolicy" -> "IfNotPresent",
"spark.executor.memory" -> "512M", "spark.executor.memory" -> "512M",
"spark.driver.memory" -> "1024M", "spark.driver.memory" -> "1024M",

View File

@ -51,7 +51,7 @@ abstract class SparkOnKubernetesSuiteBase
// TODO Support more Spark version // TODO Support more Spark version
// Spark official docker image: https://hub.docker.com/r/apache/spark/tags // Spark official docker image: https://hub.docker.com/r/apache/spark/tags
KyuubiConf().set("spark.master", s"k8s://$apiServerAddress") KyuubiConf().set("spark.master", s"k8s://$apiServerAddress")
.set("spark.kubernetes.container.image", "apache/spark:3.5.4") .set("spark.kubernetes.container.image", "apache/spark:3.5.5")
.set("spark.kubernetes.container.image.pullPolicy", "IfNotPresent") .set("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")
.set("spark.executor.instances", "1") .set("spark.executor.instances", "1")
.set("spark.executor.memory", "512M") .set("spark.executor.memory", "512M")

View File

@ -200,7 +200,7 @@
DO NOT forget to change the following properties when change the minor version of Spark: DO NOT forget to change the following properties when change the minor version of Spark:
`delta.version`, `delta.artifact`, `maven.plugin.scalatest.exclude.tags` `delta.version`, `delta.artifact`, `maven.plugin.scalatest.exclude.tags`
--> -->
<spark.version>3.5.4</spark.version> <spark.version>3.5.5</spark.version>
<spark.binary.version>3.5</spark.binary.version> <spark.binary.version>3.5</spark.binary.version>
<spark.archive.scala.suffix></spark.archive.scala.suffix> <spark.archive.scala.suffix></spark.archive.scala.suffix>
<spark.archive.name>spark-${spark.version}-bin-hadoop3${spark.archive.scala.suffix}.tgz</spark.archive.name> <spark.archive.name>spark-${spark.version}-bin-hadoop3${spark.archive.scala.suffix}.tgz</spark.archive.name>
@ -1932,7 +1932,7 @@
<java.version>8</java.version> <java.version>8</java.version>
<!-- <!--
Iceberg drops support for Java 8 since 1.7.0. Iceberg drops support for Java 8 since 1.7.0.
And it may have compatible issue with Spark 3.5.4, see Iceberg #11731 And it may have compatible issue with Spark 3.5.4+, see Iceberg #11731
--> -->
<iceberg.version>1.6.1</iceberg.version> <iceberg.version>1.6.1</iceberg.version>
</properties> </properties>
@ -2044,7 +2044,7 @@
<module>extensions/spark/kyuubi-spark-connector-hive</module> <module>extensions/spark/kyuubi-spark-connector-hive</module>
</modules> </modules>
<properties> <properties>
<spark.version>3.5.4</spark.version> <spark.version>3.5.5</spark.version>
<spark.binary.version>3.5</spark.binary.version> <spark.binary.version>3.5</spark.binary.version>
<delta.version>3.3.0</delta.version> <delta.version>3.3.0</delta.version>
<delta.artifact>delta-spark_${scala.binary.version}</delta.artifact> <delta.artifact>delta-spark_${scala.binary.version}</delta.artifact>