[KYUUBI #4797] [ARROW] Reflective calls to the function ArrowUtils#toArrowSchema

### _Why are the changes needed?_

to adapt Spark 3.5

the signature of function `ArrowUtils#toArrowSchema` is changed in https://github.com/apache/spark/pull/40988 (since Spark3.5)

Spark 3.4 or previous

```scala
   def toArrowSchema(schema: StructType, timeZoneId: String): Schema
```

Spark 3.5 or later
```scala
   def toArrowSchema(
      schema: StructType,
      timeZoneId: String,
      errorOnDuplicatedFieldNames: Boolean): Schema
```

Kyuubi is not affected by the issue of duplicated nested field names, as it consistently converts struct types to string types in Arrow mode

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4797 from cfmcgrady/arrow-toArrowSchema.

Closes #4797

2eb881b87 [Fu Chen] auto box
f69e0b395 [Fu Chen] asInstanceOf[Object] -> new JBoolean(errorOnDuplicatedFieldNames)
84c0ed381 [Fu Chen] unnecessarily force conversions
5ca65df8e [Fu Chen] Revert "new JBoolean"
0f7a1b4bd [Fu Chen] new JBoolean
044ba421c [Fu Chen] update comment
989c3caf1 [Fu Chen] reflective call ArrowUtils.toArrowSchema

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Fu Chen 2023-05-10 20:17:04 +08:00 committed by Cheng Pan
parent e112e381ff
commit 47353911d2
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D

View File

@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.arrow
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.lang.{Boolean => JBoolean}
import java.nio.channels.Channels
import scala.collection.JavaConverters._
@ -26,6 +27,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.arrow.vector._
import org.apache.arrow.vector.ipc.{ArrowStreamWriter, ReadChannel, WriteChannel}
import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer}
import org.apache.arrow.vector.types.pojo.{Schema => ArrowSchema}
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
@ -36,6 +38,8 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.util.Utils
import org.apache.kyuubi.reflection.DynMethods
object KyuubiArrowConverters extends SQLConfHelper with Logging {
type Batch = (Array[Byte], Long)
@ -60,7 +64,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
"slice",
0,
Long.MaxValue)
val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
val arrowSchema = toArrowSchema(schema, timeZoneId, true)
vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, sliceAllocator)
try {
val recordBatch = MessageSerializer.deserializeRecordBatch(
@ -238,7 +242,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
context: TaskContext)
extends Iterator[Array[Byte]] {
protected val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
protected val arrowSchema = toArrowSchema(schema, timeZoneId, true)
private val allocator =
ArrowUtils.rootAllocator.newChildAllocator(
s"to${this.getClass.getSimpleName}",
@ -312,6 +316,34 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
}
}
// the signature of function [[ArrowUtils.toArrowSchema]] is changed in SPARK-41971 (since Spark
// 3.5)
private lazy val toArrowSchemaMethod = DynMethods.builder("toArrowSchema")
.impl( // for Spark 3.4 or previous
"org.apache.spark.sql.util.ArrowUtils",
classOf[StructType],
classOf[String])
.impl( // for Spark 3.5 or later
"org.apache.spark.sql.util.ArrowUtils",
classOf[StructType],
classOf[String],
classOf[Boolean])
.build()
/**
* this function uses reflective calls to the [[ArrowUtils.toArrowSchema]].
*/
private def toArrowSchema(
schema: StructType,
timeZone: String,
errorOnDuplicatedFieldNames: JBoolean): ArrowSchema = {
toArrowSchemaMethod.invoke[ArrowSchema](
ArrowUtils,
schema,
timeZone,
errorOnDuplicatedFieldNames)
}
// for testing
def fromBatchIterator(
arrowBatchIter: Iterator[Array[Byte]],