diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 5b347f243..2e33ca641 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -444,8 +444,8 @@ jobs:
# https://minikube.sigs.k8s.io/docs/handbook/pushing/#7-loading-directly-to-in-cluster-container-runtime
minikube image load apache/kyuubi:latest
# pre-install spark into minikube
- docker pull apache/spark:3.5.4
- minikube image load apache/spark:3.5.4
+ docker pull apache/spark:3.5.5
+ minikube image load apache/spark:3.5.5
- name: kubectl pre-check
run: |
kubectl get nodes
diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh
index 5a495c934..ba7f0eb79 100755
--- a/bin/docker-image-tool.sh
+++ b/bin/docker-image-tool.sh
@@ -181,8 +181,8 @@ Examples:
$0 -r docker.io/myrepo -t v1.8.1 build
$0 -r docker.io/myrepo -t v1.8.1 push
- - Build and push with tag "v1.8.1" and Spark-3.5.4 as base image to docker.io/myrepo
- $0 -r docker.io/myrepo -t v1.8.1 -b BASE_IMAGE=repo/spark:3.5.4 build
+ - Build and push with tag "v1.8.1" and Spark-3.5.5 as base image to docker.io/myrepo
+ $0 -r docker.io/myrepo -t v1.8.1 -b BASE_IMAGE=repo/spark:3.5.5 build
$0 -r docker.io/myrepo -t v1.8.1 push
- Build and push for multiple archs to docker.io/myrepo
diff --git a/docs/deployment/kyuubi_on_kubernetes.md b/docs/deployment/kyuubi_on_kubernetes.md
index 80f3e7509..0a81ab922 100644
--- a/docs/deployment/kyuubi_on_kubernetes.md
+++ b/docs/deployment/kyuubi_on_kubernetes.md
@@ -42,8 +42,8 @@ Examples:
$0 -r docker.io/myrepo -t v1.8.1 build
$0 -r docker.io/myrepo -t v1.8.1 push
- - Build and push with tag "v1.8.1" and Spark-3.5.4 as base image to docker.io/myrepo
- $0 -r docker.io/myrepo -t v1.8.1 -b BASE_IMAGE=repo/spark:3.5.4 build
+ - Build and push with tag "v1.8.1" and Spark-3.5.5 as base image to docker.io/myrepo
+ $0 -r docker.io/myrepo -t v1.8.1 -b BASE_IMAGE=repo/spark:3.5.5 build
$0 -r docker.io/myrepo -t v1.8.1 push
- Build and push for multiple archs to docker.io/myrepo
diff --git a/docs/extensions/engines/spark/lineage.md b/docs/extensions/engines/spark/lineage.md
index a2414269c..a5d445863 100644
--- a/docs/extensions/engines/spark/lineage.md
+++ b/docs/extensions/engines/spark/lineage.md
@@ -117,7 +117,7 @@ Sometimes, it may be incompatible with other Spark distributions, then you may n
For example,
```shell
-build/mvn clean package -pl :kyuubi-spark-lineage_2.12 -am -DskipTests -Dspark.version=3.5.4
+build/mvn clean package -pl :kyuubi-spark-lineage_2.12 -am -DskipTests -Dspark.version=3.5.5
```
The available `spark.version`s are shown in the following table.
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index 0ccfd4912..f56aa977b 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -19,15 +19,16 @@ package org.apache.kyuubi.spark.connector.hive
import java.lang.{Boolean => JBoolean, Long => JLong}
+import scala.util.Try
+
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
-import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.connector.catalog.TableChange
-import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
+import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes, calculateSingleLocationSize}
import org.apache.spark.sql.execution.datasources.{PartitionDirectory, PartitionedFile}
@@ -35,20 +36,18 @@ import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
-import org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION
import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors, DynMethods}
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
object HiveConnectorUtils extends Logging {
- // SPARK-43186
- def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat = {
- if (SPARK_RUNTIME_VERSION >= "3.5") {
+ def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat =
+ Try { // SPARK-43186: 3.5.0
DynConstructors.builder()
.impl(classOf[HiveFileFormat], classOf[FileSinkDesc])
.build[HiveFileFormat]()
.newInstance(fileSinkConf)
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
val shimFileSinkDescClz = DynClasses.builder()
.impl("org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc")
.build()
@@ -67,34 +66,26 @@ object HiveConnectorUtils extends Logging {
.impl(classOf[HiveFileFormat], shimFileSinkDescClz)
.build[HiveFileFormat]()
.newInstance(shimFileSinkDesc)
- } else {
- throw unsupportedSparkVersion()
- }
- }
+ }.get
- // SPARK-41970
- def partitionedFilePath(file: PartitionedFile): String = {
- if (SPARK_RUNTIME_VERSION >= "3.4") {
+ def partitionedFilePath(file: PartitionedFile): String =
+ Try { // SPARK-41970: 3.4.0
invokeAs[String](file, "urlEncodedPath")
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
invokeAs[String](file, "filePath")
- } else {
- throw unsupportedSparkVersion()
- }
- }
+ }.get
def splitFiles(
sparkSession: SparkSession,
file: AnyRef,
filePath: Path,
- isSplitable: Boolean,
- maxSplitBytes: Long,
- partitionValues: InternalRow): Seq[PartitionedFile] = {
-
- if (SPARK_RUNTIME_VERSION >= "4.0") { // SPARK-42821
+ isSplitable: JBoolean,
+ maxSplitBytes: JLong,
+ partitionValues: InternalRow): Seq[PartitionedFile] =
+ Try { // SPARK-42821: 4.0.0-preview2
val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
- .build()
+ .buildChecked()
DynMethods
.builder("splitFiles")
.impl(
@@ -103,17 +94,40 @@ object HiveConnectorUtils extends Logging {
classOf[Boolean],
classOf[Long],
classOf[InternalRow])
- .build()
- .invoke[Seq[PartitionedFile]](
+ .buildChecked()
+ .invokeChecked[Seq[PartitionedFile]](
null,
file,
- isSplitable.asInstanceOf[JBoolean],
- maxSplitBytes.asInstanceOf[JLong],
+ isSplitable,
+ maxSplitBytes,
partitionValues)
- } else if (SPARK_RUNTIME_VERSION >= "3.5") { // SPARK-43039
+ }.recover { case _: Exception => // SPARK-51185: Spark 3.5.5
val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
- .build()
+ .buildChecked()
+ DynMethods
+ .builder("splitFiles")
+ .impl(
+ "org.apache.spark.sql.execution.PartitionedFileUtil",
+ classOf[SparkSession],
+ fileStatusWithMetadataClz,
+ classOf[Path],
+ classOf[Boolean],
+ classOf[Long],
+ classOf[InternalRow])
+ .buildChecked()
+ .invokeChecked[Seq[PartitionedFile]](
+ null,
+ sparkSession,
+ file,
+ filePath,
+ isSplitable,
+ maxSplitBytes,
+ partitionValues)
+ }.recover { case _: Exception => // SPARK-43039: 3.5.0
+ val fileStatusWithMetadataClz = DynClasses.builder()
+ .impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
+ .buildChecked()
DynMethods
.builder("splitFiles")
.impl(
@@ -123,15 +137,15 @@ object HiveConnectorUtils extends Logging {
classOf[Boolean],
classOf[Long],
classOf[InternalRow])
- .build()
- .invoke[Seq[PartitionedFile]](
+ .buildChecked()
+ .invokeChecked[Seq[PartitionedFile]](
null,
sparkSession,
file,
- isSplitable.asInstanceOf[JBoolean],
- maxSplitBytes.asInstanceOf[JLong],
+ isSplitable,
+ maxSplitBytes,
partitionValues)
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
DynMethods
.builder("splitFiles")
.impl(
@@ -142,55 +156,41 @@ object HiveConnectorUtils extends Logging {
classOf[Boolean],
classOf[Long],
classOf[InternalRow])
- .build()
- .invoke[Seq[PartitionedFile]](
+ .buildChecked()
+ .invokeChecked[Seq[PartitionedFile]](
null,
sparkSession,
file,
filePath,
- isSplitable.asInstanceOf[JBoolean],
- maxSplitBytes.asInstanceOf[JLong],
+ isSplitable,
+ maxSplitBytes,
partitionValues)
- } else {
- throw unsupportedSparkVersion()
- }
- }
+ }.get
- def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]): PartitionDirectory = {
- if (SPARK_RUNTIME_VERSION >= "3.5") {
+ def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]): PartitionDirectory =
+ Try { // SPARK-43039: 3.5.0
new DynMethods.Builder("apply")
.impl(classOf[PartitionDirectory], classOf[InternalRow], classOf[Array[FileStatus]])
.buildChecked()
.asStatic()
.invoke[PartitionDirectory](values, files.toArray)
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
new DynMethods.Builder("apply")
.impl(classOf[PartitionDirectory], classOf[InternalRow], classOf[Seq[FileStatus]])
.buildChecked()
.asStatic()
.invoke[PartitionDirectory](values, files)
- } else {
- throw unsupportedSparkVersion()
- }
- }
+ }.get
- def getPartitionFilePath(file: AnyRef): Path = {
- if (SPARK_RUNTIME_VERSION >= "3.5") {
+ def getPartitionFilePath(file: AnyRef): Path =
+ Try { // SPARK-43039: 3.5.0
new DynMethods.Builder("getPath")
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
.build()
.invoke[Path](file)
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
file.asInstanceOf[FileStatus].getPath
- } else {
- throw unsupportedSparkVersion()
- }
- }
-
- private def unsupportedSparkVersion(): KyuubiHiveConnectorException = {
- KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
- "is not supported by Kyuubi spark hive connector.")
- }
+ }.get
def calculateTotalSize(
spark: SparkSession,
diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/deployment/KyuubiOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/deployment/KyuubiOnKubernetesTestsSuite.scala
index 10565087a..0b5852fab 100644
--- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/deployment/KyuubiOnKubernetesTestsSuite.scala
+++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/deployment/KyuubiOnKubernetesTestsSuite.scala
@@ -56,7 +56,7 @@ class KyuubiOnKubernetesWithSparkTestsBase extends WithKyuubiServerOnKubernetes
Map(
"spark.master" -> s"k8s://$miniKubeApiMaster",
// We should update spark docker image in ./github/workflows/master.yml at the same time
- "spark.kubernetes.container.image" -> "apache/spark:3.5.4",
+ "spark.kubernetes.container.image" -> "apache/spark:3.5.5",
"spark.kubernetes.container.image.pullPolicy" -> "IfNotPresent",
"spark.executor.memory" -> "512M",
"spark.driver.memory" -> "1024M",
diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index 0aff5ca79..d6ef69d08 100644
--- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -51,7 +51,7 @@ abstract class SparkOnKubernetesSuiteBase
// TODO Support more Spark version
// Spark official docker image: https://hub.docker.com/r/apache/spark/tags
KyuubiConf().set("spark.master", s"k8s://$apiServerAddress")
- .set("spark.kubernetes.container.image", "apache/spark:3.5.4")
+ .set("spark.kubernetes.container.image", "apache/spark:3.5.5")
.set("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")
.set("spark.executor.instances", "1")
.set("spark.executor.memory", "512M")
diff --git a/pom.xml b/pom.xml
index 2bd3e4d93..2c947bc8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -200,7 +200,7 @@
DO NOT forget to change the following properties when change the minor version of Spark:
`delta.version`, `delta.artifact`, `maven.plugin.scalatest.exclude.tags`
-->
- 3.5.4
+ 3.5.5
3.5
spark-${spark.version}-bin-hadoop3${spark.archive.scala.suffix}.tgz
@@ -1932,7 +1932,7 @@
8
1.6.1
@@ -2044,7 +2044,7 @@
extensions/spark/kyuubi-spark-connector-hive
- 3.5.4
+ 3.5.5
3.5
3.3.0
delta-spark_${scala.binary.version}