diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index c9a5f21dd..d4418ec26 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -262,6 +262,11 @@ object SparkSQLEngine extends Logging { val rootDir = _sparkConf.getOption("spark.repl.classdir").getOrElse(getLocalDir(_sparkConf)) val outputDir = Utils.createTempDir(prefix = "repl", root = rootDir) _sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled", "true") + // SPARK-47911: we must set a value instead of leaving it as None, otherwise, we will get a + // "Cannot mutate ReadOnlySQLConf" exception when task calling HiveResult.getBinaryFormatter. + // Here we follow the HiveResult.getBinaryFormatter behavior to set it to UTF8 if configuration + // is absent to reserve the legacy behavior for compatibility. + _sparkConf.setIfMissing("spark.sql.binaryOutputStyle", "UTF8") _sparkConf.setIfMissing("spark.master", "local") _sparkConf.set( "spark.redaction.regex", diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala index c5f322108..47e6351b2 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala @@ -17,17 +17,43 @@ package org.apache.kyuubi.engine.spark.schema +import java.lang.{Boolean => JBoolean} + import org.apache.spark.sql.execution.HiveResult import org.apache.spark.sql.execution.HiveResult.TimeFormatters import org.apache.spark.sql.types._ +import org.apache.kyuubi.util.reflect.DynMethods + object RowSet { + // SPARK-47911 (4.0.0) introduced it + type BinaryFormatter = Array[Byte] => String + + def getBinaryFormatter: BinaryFormatter = + DynMethods.builder("getBinaryFormatter") + .impl(HiveResult.getClass) // for Spark 4.0 and later + .orNoop() // for Spark 3.5 and before + .buildChecked(HiveResult) + .invokeChecked[BinaryFormatter]() + def toHiveString( valueAndType: (Any, DataType), - nested: Boolean = false, - timeFormatters: TimeFormatters): String = { - HiveResult.toHiveString(valueAndType, nested, timeFormatters) - } - + nested: JBoolean = false, + timeFormatters: TimeFormatters, + binaryFormatter: BinaryFormatter): String = + DynMethods.builder("toHiveString") + .impl( // for Spark 3.5 and before + HiveResult.getClass, + classOf[(Any, DataType)], + classOf[Boolean], + classOf[TimeFormatters]) + .impl( // for Spark 4.0 and later + HiveResult.getClass, + classOf[(Any, DataType)], + classOf[Boolean], + classOf[TimeFormatters], + classOf[BinaryFormatter]) + .buildChecked(HiveResult) + .invokeChecked[String](valueAndType, nested, timeFormatters, binaryFormatter) } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala index 1d1b5ef6a..aa6c3383f 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala @@ -27,8 +27,9 @@ import org.apache.kyuubi.shaded.hive.service.rpc.thrift._ class SparkTRowSetGenerator extends TRowSetGenerator[StructType, Row, DataType] { - // reused time formatters in single RowSet generation, see KYUUBI-5811 + // reused time formatters in single RowSet generation, see KYUUBI #5811 private val tf = HiveResult.getTimeFormatters + private val bf = RowSet.getBinaryFormatter override def getColumnSizeFromSchemaType(schema: StructType): Int = schema.length @@ -51,6 +52,7 @@ class SparkTRowSetGenerator case BinaryType => asByteArrayTColumn(rows, ordinal) case _ => val timeFormatters = tf + val binaryFormatter = bf asStringTColumn( rows, ordinal, @@ -58,7 +60,8 @@ class SparkTRowSetGenerator (row, ordinal) => RowSet.toHiveString( getColumnAs[Any](row, ordinal) -> typ, - timeFormatters = timeFormatters)) + timeFormatters = timeFormatters, + binaryFormatter = binaryFormatter)) } } @@ -75,7 +78,11 @@ class SparkTRowSetGenerator case _ => asStringTColumnValue( row, ordinal, - rawValue => RowSet.toHiveString(rawValue -> types(ordinal).dataType, timeFormatters = tf)) + rawValue => + RowSet.toHiveString( + rawValue -> types(ordinal).dataType, + timeFormatters = tf, + binaryFormatter = bf)) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index b78c8b7a3..dda7bb4d0 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -103,7 +103,7 @@ object SparkDatasetHelper extends Logging { // an udf to call `RowSet.toHiveString` on complex types(struct/array/map) and timestamp type. // TODO: reuse the timeFormatters on greater scale if possible, - // recreating timeFormatters may cause performance issue, see [KYUUBI#5811] + // recreate timeFormatters each time may cause performance issue, see KYUUBI #5811 val toHiveStringUDF = udf[String, Row, String]((row, schemaDDL) => { val dt = DataType.fromDDL(schemaDDL) dt match { @@ -111,22 +111,26 @@ object SparkDatasetHelper extends Logging { RowSet.toHiveString( (row, st), nested = true, - timeFormatters = HiveResult.getTimeFormatters) + timeFormatters = HiveResult.getTimeFormatters, + binaryFormatter = RowSet.getBinaryFormatter) case StructType(Array(StructField(_, at: ArrayType, _, _))) => RowSet.toHiveString( (row.toSeq.head, at), nested = true, - timeFormatters = HiveResult.getTimeFormatters) + timeFormatters = HiveResult.getTimeFormatters, + binaryFormatter = RowSet.getBinaryFormatter) case StructType(Array(StructField(_, mt: MapType, _, _))) => RowSet.toHiveString( (row.toSeq.head, mt), nested = true, - timeFormatters = HiveResult.getTimeFormatters) + timeFormatters = HiveResult.getTimeFormatters, + binaryFormatter = RowSet.getBinaryFormatter) case StructType(Array(StructField(_, tt: TimestampType, _, _))) => RowSet.toHiveString( (row.toSeq.head, tt), nested = true, - timeFormatters = HiveResult.getTimeFormatters) + timeFormatters = HiveResult.getTimeFormatters, + binaryFormatter = RowSet.getBinaryFormatter) case _ => throw new UnsupportedOperationException } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala index 228bdcaf2..417e84f8a 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala @@ -168,7 +168,8 @@ class RowSetSuite extends KyuubiFunSuite { case (b, i) => assert(b === RowSet.toHiveString( Date.valueOf(s"2018-11-${i + 1}") -> DateType, - timeFormatters = HiveResult.getTimeFormatters)) + timeFormatters = HiveResult.getTimeFormatters, + binaryFormatter = RowSet.getBinaryFormatter)) } val tsCol = cols.next().getStringVal @@ -177,7 +178,8 @@ class RowSetSuite extends KyuubiFunSuite { case (b, i) => assert(b === RowSet.toHiveString( Timestamp.valueOf(s"2018-11-17 13:33:33.$i") -> TimestampType, - timeFormatters = HiveResult.getTimeFormatters)) + timeFormatters = HiveResult.getTimeFormatters, + binaryFormatter = RowSet.getBinaryFormatter)) } val binCol = cols.next().getBinaryVal @@ -191,7 +193,8 @@ class RowSetSuite extends KyuubiFunSuite { case (b, 11) => assert(b === "NULL") case (b, i) => assert(b === RowSet.toHiveString( Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq -> ArrayType(DoubleType), - timeFormatters = HiveResult.getTimeFormatters)) + timeFormatters = HiveResult.getTimeFormatters, + binaryFormatter = RowSet.getBinaryFormatter)) } val mapCol = cols.next().getStringVal @@ -199,7 +202,8 @@ class RowSetSuite extends KyuubiFunSuite { case (b, 11) => assert(b === "NULL") case (b, i) => assert(b === RowSet.toHiveString( Map(i -> java.lang.Double.valueOf(s"$i.$i")) -> MapType(IntegerType, DoubleType), - timeFormatters = HiveResult.getTimeFormatters)) + timeFormatters = HiveResult.getTimeFormatters, + binaryFormatter = RowSet.getBinaryFormatter)) } val intervalCol = cols.next().getStringVal @@ -250,7 +254,8 @@ class RowSetSuite extends KyuubiFunSuite { assert(r8.get(13).getStringVal.getValue === RowSet.toHiveString( Map(7 -> 7.7d) -> MapType(IntegerType, DoubleType), - timeFormatters = HiveResult.getTimeFormatters)) + timeFormatters = HiveResult.getTimeFormatters, + binaryFormatter = RowSet.getBinaryFormatter)) val r9 = iter.next().getColVals assert(r9.get(14).getStringVal.getValue === new CalendarInterval(8, 8, 8).toString)