From f040d7ca25e87d1ae67d4ef38f24199a95da7f6e Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Mon, 5 Jun 2023 19:21:48 +0800 Subject: [PATCH] [KYUUBI #4923] [ARROW] Update arguments of `ArrowUtils#toArrowSchema` function ### _Why are the changes needed?_ to adapt Spark 3.5, the new conf `spark.sql.execution.arrow.useLargeVarType` was introduced in https://github.com/apache/spark/pull/39572 the signature of function `ArrowUtils#toArrowSchema` before ```scala def toArrowSchema( schema: StructType, timeZoneId: String, errorOnDuplicatedFieldNames: Boolean): Schema ``` after ```scala def toArrowSchema( schema: StructType, timeZoneId: String, errorOnDuplicatedFieldNames: Boolean, largeVarTypes: Boolean = false): Schema ``` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4923 from cfmcgrady/arrow-toArrowSchema. Closes #4923 3806494a5 [Fu Chen] Update Arguments of ArrowUtils#toArrowSchema Function Authored-by: Fu Chen Signed-off-by: fwang12 --- .../spark/operation/ExecuteStatement.scala | 16 ++++++++++++++++ .../execution/arrow/KyuubiArrowConverters.scala | 11 +++++++---- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 659cf0a61..17d8a7412 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.config.KyuubiConf.OPERATION_RESULT_MAX_ROWS import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ +import org.apache.kyuubi.engine.spark.session.SparkSessionImpl import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -185,6 +186,8 @@ class ArrowBasedExecuteStatement( incrementalCollect, handle) { + checkUseLargeVarType() + override protected def incrementalCollectResult(resultDF: DataFrame): Iterator[Any] = { toArrowBatchLocalIterator(convertComplexType(resultDF)) } @@ -204,4 +207,17 @@ class ArrowBasedExecuteStatement( private def convertComplexType(df: DataFrame): DataFrame = { convertTopLevelComplexTypeToHiveString(df, timestampAsString) } + + def checkUseLargeVarType(): Unit = { + // TODO: largeVarType support, see SPARK-39979. + val useLargeVarType = session.asInstanceOf[SparkSessionImpl].spark + .conf + .get("spark.sql.execution.arrow.useLargeVarType", "false") + .toBoolean + if (useLargeVarType) { + throw new KyuubiSQLException( + "`spark.sql.execution.arrow.useLargeVarType = true` not support now.", + null) + } + } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala index 35fa09b61..f78552602 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala @@ -64,7 +64,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { "slice", 0, Long.MaxValue) - val arrowSchema = toArrowSchema(schema, timeZoneId, true) + val arrowSchema = toArrowSchema(schema, timeZoneId, true, false) vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, sliceAllocator) try { val recordBatch = MessageSerializer.deserializeRecordBatch( @@ -242,7 +242,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { context: TaskContext) extends Iterator[Array[Byte]] { - protected val arrowSchema = toArrowSchema(schema, timeZoneId, true) + protected val arrowSchema = toArrowSchema(schema, timeZoneId, true, false) private val allocator = ArrowUtils.rootAllocator.newChildAllocator( s"to${this.getClass.getSimpleName}", @@ -327,6 +327,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { "org.apache.spark.sql.util.ArrowUtils", classOf[StructType], classOf[String], + classOf[Boolean], classOf[Boolean]) .build() @@ -336,12 +337,14 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { private def toArrowSchema( schema: StructType, timeZone: String, - errorOnDuplicatedFieldNames: JBoolean): ArrowSchema = { + errorOnDuplicatedFieldNames: JBoolean, + largeVarTypes: JBoolean): ArrowSchema = { toArrowSchemaMethod.invoke[ArrowSchema]( ArrowUtils, schema, timeZone, - errorOnDuplicatedFieldNames) + errorOnDuplicatedFieldNames, + largeVarTypes) } // IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this for adapt Spark-3.1/3.2