[KYUUBI #6404] Fix HiveResult.toHiveString compatibility for Spark 4.0

# 🔍 Description

SPARK-47911 introduced breaking changes for `HiveResult.toHiveString`, here we use reflection to fix the compatibility.

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

```
build/mvn clean install -Pscala-2.13 -Pspark-master \
  -pl externals/kyuubi-spark-sql-engine -am \
  -Dtest=none -DwildcardSuites=org.apache.kyuubi.engine.spark.schema.RowSetSuite
```

before - compilation error
```
[INFO] --- scala-maven-plugin:4.8.0:compile (scala-compile-first)  kyuubi-spark-sql-engine_2.13 ---
...
[ERROR] [Error] /home/kyuubi/apache-kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala:30: not enough arguments for method toHiveString: (a: (Any, org.apache.spark.sql.types.DataType), nested: Boolean, formatters: org.apache.spark.sql.execution.HiveResult.TimeFormatters, binaryFormatter: org.apache.spark.sql.execution.HiveResult.BinaryFormatter): String.
Unspecified value parameter binaryFormatter.
```

after - UT pass
```
[INFO] --- scalatest-maven-plugin:2.2.0:test (test)  kyuubi-spark-sql-engine_2.13 ---
[INFO] ScalaTest report directory: /home/kyuubi/apache-kyuubi/externals/kyuubi-spark-sql-engine/target/surefire-reports
Discovery starting.
Discovery completed in 1 second, 959 milliseconds.
Run starting. Expected test count is: 3
RowSetSuite:
- column based set
- row based set
- to row set
Run completed in 2 seconds, 712 milliseconds.
Total number of tests run: 3
Suites: completed 2, aborted 0
Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6404 from pan3793/hive-string.

Closes #6404

6b3c743eb [Cheng Pan] fix breaking change of HiveResult.toHiveString caused by SPARK-47911

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2024-05-22 15:07:36 +08:00
parent 9c1b779b10
commit 5b592d07ca
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
5 changed files with 65 additions and 18 deletions

View File

@ -262,6 +262,11 @@ object SparkSQLEngine extends Logging {
val rootDir = _sparkConf.getOption("spark.repl.classdir").getOrElse(getLocalDir(_sparkConf))
val outputDir = Utils.createTempDir(prefix = "repl", root = rootDir)
_sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled", "true")
// SPARK-47911: we must set a value instead of leaving it as None, otherwise, we will get a
// "Cannot mutate ReadOnlySQLConf" exception when task calling HiveResult.getBinaryFormatter.
// Here we follow the HiveResult.getBinaryFormatter behavior to set it to UTF8 if configuration
// is absent to reserve the legacy behavior for compatibility.
_sparkConf.setIfMissing("spark.sql.binaryOutputStyle", "UTF8")
_sparkConf.setIfMissing("spark.master", "local")
_sparkConf.set(
"spark.redaction.regex",

View File

@ -17,17 +17,43 @@
package org.apache.kyuubi.engine.spark.schema
import java.lang.{Boolean => JBoolean}
import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.execution.HiveResult.TimeFormatters
import org.apache.spark.sql.types._
import org.apache.kyuubi.util.reflect.DynMethods
object RowSet {
// SPARK-47911 (4.0.0) introduced it
type BinaryFormatter = Array[Byte] => String
def getBinaryFormatter: BinaryFormatter =
DynMethods.builder("getBinaryFormatter")
.impl(HiveResult.getClass) // for Spark 4.0 and later
.orNoop() // for Spark 3.5 and before
.buildChecked(HiveResult)
.invokeChecked[BinaryFormatter]()
def toHiveString(
valueAndType: (Any, DataType),
nested: Boolean = false,
timeFormatters: TimeFormatters): String = {
HiveResult.toHiveString(valueAndType, nested, timeFormatters)
}
nested: JBoolean = false,
timeFormatters: TimeFormatters,
binaryFormatter: BinaryFormatter): String =
DynMethods.builder("toHiveString")
.impl( // for Spark 3.5 and before
HiveResult.getClass,
classOf[(Any, DataType)],
classOf[Boolean],
classOf[TimeFormatters])
.impl( // for Spark 4.0 and later
HiveResult.getClass,
classOf[(Any, DataType)],
classOf[Boolean],
classOf[TimeFormatters],
classOf[BinaryFormatter])
.buildChecked(HiveResult)
.invokeChecked[String](valueAndType, nested, timeFormatters, binaryFormatter)
}

View File

@ -27,8 +27,9 @@ import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
class SparkTRowSetGenerator
extends TRowSetGenerator[StructType, Row, DataType] {
// reused time formatters in single RowSet generation, see KYUUBI-5811
// reused time formatters in single RowSet generation, see KYUUBI #5811
private val tf = HiveResult.getTimeFormatters
private val bf = RowSet.getBinaryFormatter
override def getColumnSizeFromSchemaType(schema: StructType): Int = schema.length
@ -51,6 +52,7 @@ class SparkTRowSetGenerator
case BinaryType => asByteArrayTColumn(rows, ordinal)
case _ =>
val timeFormatters = tf
val binaryFormatter = bf
asStringTColumn(
rows,
ordinal,
@ -58,7 +60,8 @@ class SparkTRowSetGenerator
(row, ordinal) =>
RowSet.toHiveString(
getColumnAs[Any](row, ordinal) -> typ,
timeFormatters = timeFormatters))
timeFormatters = timeFormatters,
binaryFormatter = binaryFormatter))
}
}
@ -75,7 +78,11 @@ class SparkTRowSetGenerator
case _ => asStringTColumnValue(
row,
ordinal,
rawValue => RowSet.toHiveString(rawValue -> types(ordinal).dataType, timeFormatters = tf))
rawValue =>
RowSet.toHiveString(
rawValue -> types(ordinal).dataType,
timeFormatters = tf,
binaryFormatter = bf))
}
}

View File

@ -103,7 +103,7 @@ object SparkDatasetHelper extends Logging {
// an udf to call `RowSet.toHiveString` on complex types(struct/array/map) and timestamp type.
// TODO: reuse the timeFormatters on greater scale if possible,
// recreating timeFormatters may cause performance issue, see [KYUUBI#5811]
// recreate timeFormatters each time may cause performance issue, see KYUUBI #5811
val toHiveStringUDF = udf[String, Row, String]((row, schemaDDL) => {
val dt = DataType.fromDDL(schemaDDL)
dt match {
@ -111,22 +111,26 @@ object SparkDatasetHelper extends Logging {
RowSet.toHiveString(
(row, st),
nested = true,
timeFormatters = HiveResult.getTimeFormatters)
timeFormatters = HiveResult.getTimeFormatters,
binaryFormatter = RowSet.getBinaryFormatter)
case StructType(Array(StructField(_, at: ArrayType, _, _))) =>
RowSet.toHiveString(
(row.toSeq.head, at),
nested = true,
timeFormatters = HiveResult.getTimeFormatters)
timeFormatters = HiveResult.getTimeFormatters,
binaryFormatter = RowSet.getBinaryFormatter)
case StructType(Array(StructField(_, mt: MapType, _, _))) =>
RowSet.toHiveString(
(row.toSeq.head, mt),
nested = true,
timeFormatters = HiveResult.getTimeFormatters)
timeFormatters = HiveResult.getTimeFormatters,
binaryFormatter = RowSet.getBinaryFormatter)
case StructType(Array(StructField(_, tt: TimestampType, _, _))) =>
RowSet.toHiveString(
(row.toSeq.head, tt),
nested = true,
timeFormatters = HiveResult.getTimeFormatters)
timeFormatters = HiveResult.getTimeFormatters,
binaryFormatter = RowSet.getBinaryFormatter)
case _ =>
throw new UnsupportedOperationException
}

View File

@ -168,7 +168,8 @@ class RowSetSuite extends KyuubiFunSuite {
case (b, i) =>
assert(b === RowSet.toHiveString(
Date.valueOf(s"2018-11-${i + 1}") -> DateType,
timeFormatters = HiveResult.getTimeFormatters))
timeFormatters = HiveResult.getTimeFormatters,
binaryFormatter = RowSet.getBinaryFormatter))
}
val tsCol = cols.next().getStringVal
@ -177,7 +178,8 @@ class RowSetSuite extends KyuubiFunSuite {
case (b, i) => assert(b ===
RowSet.toHiveString(
Timestamp.valueOf(s"2018-11-17 13:33:33.$i") -> TimestampType,
timeFormatters = HiveResult.getTimeFormatters))
timeFormatters = HiveResult.getTimeFormatters,
binaryFormatter = RowSet.getBinaryFormatter))
}
val binCol = cols.next().getBinaryVal
@ -191,7 +193,8 @@ class RowSetSuite extends KyuubiFunSuite {
case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b === RowSet.toHiveString(
Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq -> ArrayType(DoubleType),
timeFormatters = HiveResult.getTimeFormatters))
timeFormatters = HiveResult.getTimeFormatters,
binaryFormatter = RowSet.getBinaryFormatter))
}
val mapCol = cols.next().getStringVal
@ -199,7 +202,8 @@ class RowSetSuite extends KyuubiFunSuite {
case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b === RowSet.toHiveString(
Map(i -> java.lang.Double.valueOf(s"$i.$i")) -> MapType(IntegerType, DoubleType),
timeFormatters = HiveResult.getTimeFormatters))
timeFormatters = HiveResult.getTimeFormatters,
binaryFormatter = RowSet.getBinaryFormatter))
}
val intervalCol = cols.next().getStringVal
@ -250,7 +254,8 @@ class RowSetSuite extends KyuubiFunSuite {
assert(r8.get(13).getStringVal.getValue ===
RowSet.toHiveString(
Map(7 -> 7.7d) -> MapType(IntegerType, DoubleType),
timeFormatters = HiveResult.getTimeFormatters))
timeFormatters = HiveResult.getTimeFormatters,
binaryFormatter = RowSet.getBinaryFormatter))
val r9 = iter.next().getColVals
assert(r9.get(14).getStringVal.getValue === new CalendarInterval(8, 8, 8).toString)