diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 659cf0a61..17d8a7412 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.config.KyuubiConf.OPERATION_RESULT_MAX_ROWS import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ +import org.apache.kyuubi.engine.spark.session.SparkSessionImpl import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -185,6 +186,8 @@ class ArrowBasedExecuteStatement( incrementalCollect, handle) { + checkUseLargeVarType() + override protected def incrementalCollectResult(resultDF: DataFrame): Iterator[Any] = { toArrowBatchLocalIterator(convertComplexType(resultDF)) } @@ -204,4 +207,17 @@ class ArrowBasedExecuteStatement( private def convertComplexType(df: DataFrame): DataFrame = { convertTopLevelComplexTypeToHiveString(df, timestampAsString) } + + def checkUseLargeVarType(): Unit = { + // TODO: largeVarType support, see SPARK-39979. + val useLargeVarType = session.asInstanceOf[SparkSessionImpl].spark + .conf + .get("spark.sql.execution.arrow.useLargeVarType", "false") + .toBoolean + if (useLargeVarType) { + throw new KyuubiSQLException( + "`spark.sql.execution.arrow.useLargeVarType = true` not support now.", + null) + } + } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala index 35fa09b61..f78552602 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala @@ -64,7 +64,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { "slice", 0, Long.MaxValue) - val arrowSchema = toArrowSchema(schema, timeZoneId, true) + val arrowSchema = toArrowSchema(schema, timeZoneId, true, false) vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, sliceAllocator) try { val recordBatch = MessageSerializer.deserializeRecordBatch( @@ -242,7 +242,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { context: TaskContext) extends Iterator[Array[Byte]] { - protected val arrowSchema = toArrowSchema(schema, timeZoneId, true) + protected val arrowSchema = toArrowSchema(schema, timeZoneId, true, false) private val allocator = ArrowUtils.rootAllocator.newChildAllocator( s"to${this.getClass.getSimpleName}", @@ -327,6 +327,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { "org.apache.spark.sql.util.ArrowUtils", classOf[StructType], classOf[String], + classOf[Boolean], classOf[Boolean]) .build() @@ -336,12 +337,14 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { private def toArrowSchema( schema: StructType, timeZone: String, - errorOnDuplicatedFieldNames: JBoolean): ArrowSchema = { + errorOnDuplicatedFieldNames: JBoolean, + largeVarTypes: JBoolean): ArrowSchema = { toArrowSchemaMethod.invoke[ArrowSchema]( ArrowUtils, schema, timeZone, - errorOnDuplicatedFieldNames) + errorOnDuplicatedFieldNames, + largeVarTypes) } // IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this for adapt Spark-3.1/3.2