From d5b01fa3e2428f4a512632ab64238d7313666f83 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 3 Mar 2025 13:42:09 +0800 Subject: [PATCH] [KYUUBI #6939] Bump Spark 3.5.5 ### Why are the changes needed? Test Spark 3.5.5 Release Notes https://spark.apache.org/releases/spark-release-3-5-5.html ### How was this patch tested? Pass GHA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #6939 from pan3793/spark-3.5.5. Closes #6939 8c0288ae5 [Cheng Pan] ga 78b0e72db [Cheng Pan] nit 686a7b0a9 [Cheng Pan] fix d40cc5bba [Cheng Pan] Bump Spark 3.5.5 Authored-by: Cheng Pan Signed-off-by: Cheng Pan --- .github/workflows/master.yml | 4 +- bin/docker-image-tool.sh | 4 +- docs/deployment/kyuubi_on_kubernetes.md | 4 +- docs/extensions/engines/spark/lineage.md | 2 +- .../connector/hive/HiveConnectorUtils.scala | 126 +++++++++--------- .../KyuubiOnKubernetesTestsSuite.scala | 2 +- .../spark/SparkOnKubernetesTestsSuite.scala | 2 +- pom.xml | 6 +- 8 files changed, 75 insertions(+), 75 deletions(-) diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index 5b347f243..2e33ca641 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -444,8 +444,8 @@ jobs: # https://minikube.sigs.k8s.io/docs/handbook/pushing/#7-loading-directly-to-in-cluster-container-runtime minikube image load apache/kyuubi:latest # pre-install spark into minikube - docker pull apache/spark:3.5.4 - minikube image load apache/spark:3.5.4 + docker pull apache/spark:3.5.5 + minikube image load apache/spark:3.5.5 - name: kubectl pre-check run: | kubectl get nodes diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh index 5a495c934..ba7f0eb79 100755 --- a/bin/docker-image-tool.sh +++ b/bin/docker-image-tool.sh @@ -181,8 +181,8 @@ Examples: $0 -r docker.io/myrepo -t v1.8.1 build $0 -r docker.io/myrepo -t v1.8.1 push - - Build and push with tag "v1.8.1" and Spark-3.5.4 as base image to docker.io/myrepo - $0 -r docker.io/myrepo -t v1.8.1 -b BASE_IMAGE=repo/spark:3.5.4 build + - Build and push with tag "v1.8.1" and Spark-3.5.5 as base image to docker.io/myrepo + $0 -r docker.io/myrepo -t v1.8.1 -b BASE_IMAGE=repo/spark:3.5.5 build $0 -r docker.io/myrepo -t v1.8.1 push - Build and push for multiple archs to docker.io/myrepo diff --git a/docs/deployment/kyuubi_on_kubernetes.md b/docs/deployment/kyuubi_on_kubernetes.md index 80f3e7509..0a81ab922 100644 --- a/docs/deployment/kyuubi_on_kubernetes.md +++ b/docs/deployment/kyuubi_on_kubernetes.md @@ -42,8 +42,8 @@ Examples: $0 -r docker.io/myrepo -t v1.8.1 build $0 -r docker.io/myrepo -t v1.8.1 push - - Build and push with tag "v1.8.1" and Spark-3.5.4 as base image to docker.io/myrepo - $0 -r docker.io/myrepo -t v1.8.1 -b BASE_IMAGE=repo/spark:3.5.4 build + - Build and push with tag "v1.8.1" and Spark-3.5.5 as base image to docker.io/myrepo + $0 -r docker.io/myrepo -t v1.8.1 -b BASE_IMAGE=repo/spark:3.5.5 build $0 -r docker.io/myrepo -t v1.8.1 push - Build and push for multiple archs to docker.io/myrepo diff --git a/docs/extensions/engines/spark/lineage.md b/docs/extensions/engines/spark/lineage.md index a2414269c..a5d445863 100644 --- a/docs/extensions/engines/spark/lineage.md +++ b/docs/extensions/engines/spark/lineage.md @@ -117,7 +117,7 @@ Sometimes, it may be incompatible with other Spark distributions, then you may n For example, ```shell -build/mvn clean package -pl :kyuubi-spark-lineage_2.12 -am -DskipTests -Dspark.version=3.5.4 +build/mvn clean package -pl :kyuubi-spark-lineage_2.12 -am -DskipTests -Dspark.version=3.5.5 ``` The available `spark.version`s are shown in the following table. diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala index 0ccfd4912..f56aa977b 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala @@ -19,15 +19,16 @@ package org.apache.kyuubi.spark.connector.hive import java.lang.{Boolean => JBoolean, Long => JLong} +import scala.util.Try + import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} -import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition} import org.apache.spark.sql.connector.catalog.TableChange -import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType} +import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes, calculateSingleLocationSize} import org.apache.spark.sql.execution.datasources.{PartitionDirectory, PartitionedFile} @@ -35,20 +36,18 @@ import org.apache.spark.sql.hive.execution.HiveFileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} -import org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors, DynMethods} import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs object HiveConnectorUtils extends Logging { - // SPARK-43186 - def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat = { - if (SPARK_RUNTIME_VERSION >= "3.5") { + def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat = + Try { // SPARK-43186: 3.5.0 DynConstructors.builder() .impl(classOf[HiveFileFormat], classOf[FileSinkDesc]) .build[HiveFileFormat]() .newInstance(fileSinkConf) - } else if (SPARK_RUNTIME_VERSION >= "3.3") { + }.recover { case _: Exception => val shimFileSinkDescClz = DynClasses.builder() .impl("org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc") .build() @@ -67,34 +66,26 @@ object HiveConnectorUtils extends Logging { .impl(classOf[HiveFileFormat], shimFileSinkDescClz) .build[HiveFileFormat]() .newInstance(shimFileSinkDesc) - } else { - throw unsupportedSparkVersion() - } - } + }.get - // SPARK-41970 - def partitionedFilePath(file: PartitionedFile): String = { - if (SPARK_RUNTIME_VERSION >= "3.4") { + def partitionedFilePath(file: PartitionedFile): String = + Try { // SPARK-41970: 3.4.0 invokeAs[String](file, "urlEncodedPath") - } else if (SPARK_RUNTIME_VERSION >= "3.3") { + }.recover { case _: Exception => invokeAs[String](file, "filePath") - } else { - throw unsupportedSparkVersion() - } - } + }.get def splitFiles( sparkSession: SparkSession, file: AnyRef, filePath: Path, - isSplitable: Boolean, - maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] = { - - if (SPARK_RUNTIME_VERSION >= "4.0") { // SPARK-42821 + isSplitable: JBoolean, + maxSplitBytes: JLong, + partitionValues: InternalRow): Seq[PartitionedFile] = + Try { // SPARK-42821: 4.0.0-preview2 val fileStatusWithMetadataClz = DynClasses.builder() .impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata") - .build() + .buildChecked() DynMethods .builder("splitFiles") .impl( @@ -103,17 +94,40 @@ object HiveConnectorUtils extends Logging { classOf[Boolean], classOf[Long], classOf[InternalRow]) - .build() - .invoke[Seq[PartitionedFile]]( + .buildChecked() + .invokeChecked[Seq[PartitionedFile]]( null, file, - isSplitable.asInstanceOf[JBoolean], - maxSplitBytes.asInstanceOf[JLong], + isSplitable, + maxSplitBytes, partitionValues) - } else if (SPARK_RUNTIME_VERSION >= "3.5") { // SPARK-43039 + }.recover { case _: Exception => // SPARK-51185: Spark 3.5.5 val fileStatusWithMetadataClz = DynClasses.builder() .impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata") - .build() + .buildChecked() + DynMethods + .builder("splitFiles") + .impl( + "org.apache.spark.sql.execution.PartitionedFileUtil", + classOf[SparkSession], + fileStatusWithMetadataClz, + classOf[Path], + classOf[Boolean], + classOf[Long], + classOf[InternalRow]) + .buildChecked() + .invokeChecked[Seq[PartitionedFile]]( + null, + sparkSession, + file, + filePath, + isSplitable, + maxSplitBytes, + partitionValues) + }.recover { case _: Exception => // SPARK-43039: 3.5.0 + val fileStatusWithMetadataClz = DynClasses.builder() + .impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata") + .buildChecked() DynMethods .builder("splitFiles") .impl( @@ -123,15 +137,15 @@ object HiveConnectorUtils extends Logging { classOf[Boolean], classOf[Long], classOf[InternalRow]) - .build() - .invoke[Seq[PartitionedFile]]( + .buildChecked() + .invokeChecked[Seq[PartitionedFile]]( null, sparkSession, file, - isSplitable.asInstanceOf[JBoolean], - maxSplitBytes.asInstanceOf[JLong], + isSplitable, + maxSplitBytes, partitionValues) - } else if (SPARK_RUNTIME_VERSION >= "3.3") { + }.recover { case _: Exception => DynMethods .builder("splitFiles") .impl( @@ -142,55 +156,41 @@ object HiveConnectorUtils extends Logging { classOf[Boolean], classOf[Long], classOf[InternalRow]) - .build() - .invoke[Seq[PartitionedFile]]( + .buildChecked() + .invokeChecked[Seq[PartitionedFile]]( null, sparkSession, file, filePath, - isSplitable.asInstanceOf[JBoolean], - maxSplitBytes.asInstanceOf[JLong], + isSplitable, + maxSplitBytes, partitionValues) - } else { - throw unsupportedSparkVersion() - } - } + }.get - def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]): PartitionDirectory = { - if (SPARK_RUNTIME_VERSION >= "3.5") { + def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]): PartitionDirectory = + Try { // SPARK-43039: 3.5.0 new DynMethods.Builder("apply") .impl(classOf[PartitionDirectory], classOf[InternalRow], classOf[Array[FileStatus]]) .buildChecked() .asStatic() .invoke[PartitionDirectory](values, files.toArray) - } else if (SPARK_RUNTIME_VERSION >= "3.3") { + }.recover { case _: Exception => new DynMethods.Builder("apply") .impl(classOf[PartitionDirectory], classOf[InternalRow], classOf[Seq[FileStatus]]) .buildChecked() .asStatic() .invoke[PartitionDirectory](values, files) - } else { - throw unsupportedSparkVersion() - } - } + }.get - def getPartitionFilePath(file: AnyRef): Path = { - if (SPARK_RUNTIME_VERSION >= "3.5") { + def getPartitionFilePath(file: AnyRef): Path = + Try { // SPARK-43039: 3.5.0 new DynMethods.Builder("getPath") .impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata") .build() .invoke[Path](file) - } else if (SPARK_RUNTIME_VERSION >= "3.3") { + }.recover { case _: Exception => file.asInstanceOf[FileStatus].getPath - } else { - throw unsupportedSparkVersion() - } - } - - private def unsupportedSparkVersion(): KyuubiHiveConnectorException = { - KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " + - "is not supported by Kyuubi spark hive connector.") - } + }.get def calculateTotalSize( spark: SparkSession, diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/deployment/KyuubiOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/deployment/KyuubiOnKubernetesTestsSuite.scala index 10565087a..0b5852fab 100644 --- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/deployment/KyuubiOnKubernetesTestsSuite.scala +++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/deployment/KyuubiOnKubernetesTestsSuite.scala @@ -56,7 +56,7 @@ class KyuubiOnKubernetesWithSparkTestsBase extends WithKyuubiServerOnKubernetes Map( "spark.master" -> s"k8s://$miniKubeApiMaster", // We should update spark docker image in ./github/workflows/master.yml at the same time - "spark.kubernetes.container.image" -> "apache/spark:3.5.4", + "spark.kubernetes.container.image" -> "apache/spark:3.5.5", "spark.kubernetes.container.image.pullPolicy" -> "IfNotPresent", "spark.executor.memory" -> "512M", "spark.driver.memory" -> "1024M", diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala index 0aff5ca79..d6ef69d08 100644 --- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala +++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala @@ -51,7 +51,7 @@ abstract class SparkOnKubernetesSuiteBase // TODO Support more Spark version // Spark official docker image: https://hub.docker.com/r/apache/spark/tags KyuubiConf().set("spark.master", s"k8s://$apiServerAddress") - .set("spark.kubernetes.container.image", "apache/spark:3.5.4") + .set("spark.kubernetes.container.image", "apache/spark:3.5.5") .set("spark.kubernetes.container.image.pullPolicy", "IfNotPresent") .set("spark.executor.instances", "1") .set("spark.executor.memory", "512M") diff --git a/pom.xml b/pom.xml index 2bd3e4d93..2c947bc8c 100644 --- a/pom.xml +++ b/pom.xml @@ -200,7 +200,7 @@ DO NOT forget to change the following properties when change the minor version of Spark: `delta.version`, `delta.artifact`, `maven.plugin.scalatest.exclude.tags` --> - 3.5.4 + 3.5.5 3.5 spark-${spark.version}-bin-hadoop3${spark.archive.scala.suffix}.tgz @@ -1932,7 +1932,7 @@ 8 1.6.1 @@ -2044,7 +2044,7 @@ extensions/spark/kyuubi-spark-connector-hive - 3.5.4 + 3.5.5 3.5 3.3.0 delta-spark_${scala.binary.version}