diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala index 8a34943cc..5930dcdfc 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.arrow import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.lang.{Boolean => JBoolean} import java.nio.channels.Channels import scala.collection.JavaConverters._ @@ -26,6 +27,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.arrow.vector._ import org.apache.arrow.vector.ipc.{ArrowStreamWriter, ReadChannel, WriteChannel} import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer} +import org.apache.arrow.vector.types.pojo.{Schema => ArrowSchema} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -36,6 +38,8 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.util.Utils +import org.apache.kyuubi.reflection.DynMethods + object KyuubiArrowConverters extends SQLConfHelper with Logging { type Batch = (Array[Byte], Long) @@ -60,7 +64,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { "slice", 0, Long.MaxValue) - val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) + val arrowSchema = toArrowSchema(schema, timeZoneId, true) vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, sliceAllocator) try { val recordBatch = MessageSerializer.deserializeRecordBatch( @@ -238,7 +242,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { context: TaskContext) extends Iterator[Array[Byte]] { - protected val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) + protected val arrowSchema = toArrowSchema(schema, timeZoneId, true) private val allocator = ArrowUtils.rootAllocator.newChildAllocator( s"to${this.getClass.getSimpleName}", @@ -312,6 +316,34 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { } } + // the signature of function [[ArrowUtils.toArrowSchema]] is changed in SPARK-41971 (since Spark + // 3.5) + private lazy val toArrowSchemaMethod = DynMethods.builder("toArrowSchema") + .impl( // for Spark 3.4 or previous + "org.apache.spark.sql.util.ArrowUtils", + classOf[StructType], + classOf[String]) + .impl( // for Spark 3.5 or later + "org.apache.spark.sql.util.ArrowUtils", + classOf[StructType], + classOf[String], + classOf[Boolean]) + .build() + + /** + * this function uses reflective calls to the [[ArrowUtils.toArrowSchema]]. + */ + private def toArrowSchema( + schema: StructType, + timeZone: String, + errorOnDuplicatedFieldNames: JBoolean): ArrowSchema = { + toArrowSchemaMethod.invoke[ArrowSchema]( + ArrowUtils, + schema, + timeZone, + errorOnDuplicatedFieldNames) + } + // for testing def fromBatchIterator( arrowBatchIter: Iterator[Array[Byte]],