[KYUUBI #4923] [ARROW] Update arguments of ArrowUtils#toArrowSchema function

### _Why are the changes needed?_

to adapt Spark 3.5, the new conf `spark.sql.execution.arrow.useLargeVarType` was introduced  in https://github.com/apache/spark/pull/39572

the signature of function `ArrowUtils#toArrowSchema` before

```scala
   def toArrowSchema(
       schema: StructType,
       timeZoneId: String,
       errorOnDuplicatedFieldNames: Boolean): Schema
```

after

```scala
  def toArrowSchema(
      schema: StructType,
      timeZoneId: String,
      errorOnDuplicatedFieldNames: Boolean,
      largeVarTypes: Boolean = false): Schema
```

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4923 from cfmcgrady/arrow-toArrowSchema.

Closes #4923

3806494a5 [Fu Chen] Update Arguments of ArrowUtils#toArrowSchema Function

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
Fu Chen 2023-06-05 19:21:48 +08:00 committed by fwang12
parent 3692e20b1f
commit f040d7ca25
2 changed files with 23 additions and 4 deletions

View File

@ -28,6 +28,7 @@ import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf.OPERATION_RESULT_MAX_ROWS
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@ -185,6 +186,8 @@ class ArrowBasedExecuteStatement(
incrementalCollect,
handle) {
checkUseLargeVarType()
override protected def incrementalCollectResult(resultDF: DataFrame): Iterator[Any] = {
toArrowBatchLocalIterator(convertComplexType(resultDF))
}
@ -204,4 +207,17 @@ class ArrowBasedExecuteStatement(
private def convertComplexType(df: DataFrame): DataFrame = {
convertTopLevelComplexTypeToHiveString(df, timestampAsString)
}
def checkUseLargeVarType(): Unit = {
// TODO: largeVarType support, see SPARK-39979.
val useLargeVarType = session.asInstanceOf[SparkSessionImpl].spark
.conf
.get("spark.sql.execution.arrow.useLargeVarType", "false")
.toBoolean
if (useLargeVarType) {
throw new KyuubiSQLException(
"`spark.sql.execution.arrow.useLargeVarType = true` not support now.",
null)
}
}
}

View File

@ -64,7 +64,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
"slice",
0,
Long.MaxValue)
val arrowSchema = toArrowSchema(schema, timeZoneId, true)
val arrowSchema = toArrowSchema(schema, timeZoneId, true, false)
vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, sliceAllocator)
try {
val recordBatch = MessageSerializer.deserializeRecordBatch(
@ -242,7 +242,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
context: TaskContext)
extends Iterator[Array[Byte]] {
protected val arrowSchema = toArrowSchema(schema, timeZoneId, true)
protected val arrowSchema = toArrowSchema(schema, timeZoneId, true, false)
private val allocator =
ArrowUtils.rootAllocator.newChildAllocator(
s"to${this.getClass.getSimpleName}",
@ -327,6 +327,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
"org.apache.spark.sql.util.ArrowUtils",
classOf[StructType],
classOf[String],
classOf[Boolean],
classOf[Boolean])
.build()
@ -336,12 +337,14 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
private def toArrowSchema(
schema: StructType,
timeZone: String,
errorOnDuplicatedFieldNames: JBoolean): ArrowSchema = {
errorOnDuplicatedFieldNames: JBoolean,
largeVarTypes: JBoolean): ArrowSchema = {
toArrowSchemaMethod.invoke[ArrowSchema](
ArrowUtils,
schema,
timeZone,
errorOnDuplicatedFieldNames)
errorOnDuplicatedFieldNames,
largeVarTypes)
}
// IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this for adapt Spark-3.1/3.2