From 47353911d219aab3460ad4ced14fc2483579b7cd Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Wed, 10 May 2023 20:17:04 +0800 Subject: [PATCH] [KYUUBI #4797] [ARROW] Reflective calls to the function `ArrowUtils#toArrowSchema` ### _Why are the changes needed?_ to adapt Spark 3.5 the signature of function `ArrowUtils#toArrowSchema` is changed in https://github.com/apache/spark/pull/40988 (since Spark3.5) Spark 3.4 or previous ```scala def toArrowSchema(schema: StructType, timeZoneId: String): Schema ``` Spark 3.5 or later ```scala def toArrowSchema( schema: StructType, timeZoneId: String, errorOnDuplicatedFieldNames: Boolean): Schema ``` Kyuubi is not affected by the issue of duplicated nested field names, as it consistently converts struct types to string types in Arrow mode ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4797 from cfmcgrady/arrow-toArrowSchema. Closes #4797 2eb881b87 [Fu Chen] auto box f69e0b395 [Fu Chen] asInstanceOf[Object] -> new JBoolean(errorOnDuplicatedFieldNames) 84c0ed381 [Fu Chen] unnecessarily force conversions 5ca65df8e [Fu Chen] Revert "new JBoolean" 0f7a1b4bd [Fu Chen] new JBoolean 044ba421c [Fu Chen] update comment 989c3caf1 [Fu Chen] reflective call ArrowUtils.toArrowSchema Authored-by: Fu Chen Signed-off-by: Cheng Pan --- .../arrow/KyuubiArrowConverters.scala | 36 +++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala index 8a34943cc..5930dcdfc 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.arrow import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.lang.{Boolean => JBoolean} import java.nio.channels.Channels import scala.collection.JavaConverters._ @@ -26,6 +27,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.arrow.vector._ import org.apache.arrow.vector.ipc.{ArrowStreamWriter, ReadChannel, WriteChannel} import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer} +import org.apache.arrow.vector.types.pojo.{Schema => ArrowSchema} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -36,6 +38,8 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.util.Utils +import org.apache.kyuubi.reflection.DynMethods + object KyuubiArrowConverters extends SQLConfHelper with Logging { type Batch = (Array[Byte], Long) @@ -60,7 +64,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { "slice", 0, Long.MaxValue) - val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) + val arrowSchema = toArrowSchema(schema, timeZoneId, true) vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, sliceAllocator) try { val recordBatch = MessageSerializer.deserializeRecordBatch( @@ -238,7 +242,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { context: TaskContext) extends Iterator[Array[Byte]] { - protected val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) + protected val arrowSchema = toArrowSchema(schema, timeZoneId, true) private val allocator = ArrowUtils.rootAllocator.newChildAllocator( s"to${this.getClass.getSimpleName}", @@ -312,6 +316,34 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { } } + // the signature of function [[ArrowUtils.toArrowSchema]] is changed in SPARK-41971 (since Spark + // 3.5) + private lazy val toArrowSchemaMethod = DynMethods.builder("toArrowSchema") + .impl( // for Spark 3.4 or previous + "org.apache.spark.sql.util.ArrowUtils", + classOf[StructType], + classOf[String]) + .impl( // for Spark 3.5 or later + "org.apache.spark.sql.util.ArrowUtils", + classOf[StructType], + classOf[String], + classOf[Boolean]) + .build() + + /** + * this function uses reflective calls to the [[ArrowUtils.toArrowSchema]]. + */ + private def toArrowSchema( + schema: StructType, + timeZone: String, + errorOnDuplicatedFieldNames: JBoolean): ArrowSchema = { + toArrowSchemaMethod.invoke[ArrowSchema]( + ArrowUtils, + schema, + timeZone, + errorOnDuplicatedFieldNames) + } + // for testing def fromBatchIterator( arrowBatchIter: Iterator[Array[Byte]],