[KYUUBI #5811] [SPARK] Reuse time formatters instance in value serialization of TRowSet generation

# 🔍 Description
## Issue References 🔗

Subtask of #5808.

## Describe Your Solution 🔧
Value serialization to Hive style string by  `HiveResult.toHiveString`  requires a `TimeFormatters` instance handling the date/time data types. Reusing the pre-created time formatters's instance, it dramatically reduces the overhead and improves the TRowset generation.

This may help to reduce memory footprints and fewer operations for TRowSet generation performance.

This is also aligned to the Spark's `RowSetUtils` in the implementation of RowSet generation for  [SPARK-39041](https://issues.apache.org/jira/browse/SPARK-39041) by yaooqinn , with explicitly declared TimeFormatters instance.

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklists
## 📝 Author Self Checklist

- [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [x] Test coverage is ok
- [x] Assignees are selected.
- [x] Minimum number of approvals
- [x] No changes are requested

**Be nice. Be informative.**

Closes #5811 from bowenliang123/rowset-timeformatter.

Closes #5811

22709914e [Bowen Liang] Reuse time formatters instance in value serialization of TRowSet

Authored-by: Bowen Liang <liangbowen@gf.com.cn>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Bowen Liang 2023-12-07 20:22:18 +08:00 committed by Cheng Pan
parent 6a282fc5e9
commit 4463cc8f97
3 changed files with 53 additions and 18 deletions

View File

@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.execution.HiveResult.TimeFormatters
import org.apache.spark.sql.types._
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
@ -30,9 +31,10 @@ import org.apache.kyuubi.util.RowSetUtils._
object RowSet {
def toHiveString(valueAndType: (Any, DataType), nested: Boolean = false): String = {
// compatible w/ Spark 3.1 and above
val timeFormatters = HiveResult.getTimeFormatters
def toHiveString(
valueAndType: (Any, DataType),
nested: Boolean = false,
timeFormatters: TimeFormatters): String = {
HiveResult.toHiveString(valueAndType, nested, timeFormatters)
}
@ -71,6 +73,7 @@ object RowSet {
def toRowBasedSet(rows: Seq[Row], schema: StructType): TRowSet = {
val rowSize = rows.length
val tRows = new java.util.ArrayList[TRow](rowSize)
val timeFormatters = HiveResult.getTimeFormatters
var i = 0
while (i < rowSize) {
val row = rows(i)
@ -78,7 +81,7 @@ object RowSet {
var j = 0
val columnSize = row.length
while (j < columnSize) {
val columnValue = toTColumnValue(j, row, schema)
val columnValue = toTColumnValue(j, row, schema, timeFormatters)
tRow.addToColVals(columnValue)
j += 1
}
@ -91,18 +94,23 @@ object RowSet {
def toColumnBasedSet(rows: Seq[Row], schema: StructType): TRowSet = {
val rowSize = rows.length
val tRowSet = new TRowSet(0, new java.util.ArrayList[TRow](rowSize))
val timeFormatters = HiveResult.getTimeFormatters
var i = 0
val columnSize = schema.length
while (i < columnSize) {
val field = schema(i)
val tColumn = toTColumn(rows, i, field.dataType)
val tColumn = toTColumn(rows, i, field.dataType, timeFormatters)
tRowSet.addToColumns(tColumn)
i += 1
}
tRowSet
}
private def toTColumn(rows: Seq[Row], ordinal: Int, typ: DataType): TColumn = {
private def toTColumn(
rows: Seq[Row],
ordinal: Int,
typ: DataType,
timeFormatters: TimeFormatters): TColumn = {
val nulls = new java.util.BitSet()
typ match {
case BooleanType =>
@ -152,7 +160,7 @@ object RowSet {
while (i < rowSize) {
val row = rows(i)
nulls.set(i, row.isNullAt(ordinal))
values.add(toHiveString(row.get(ordinal) -> typ))
values.add(toHiveString(row.get(ordinal) -> typ, timeFormatters = timeFormatters))
i += 1
}
TColumn.stringVal(new TStringColumn(values, nulls))
@ -184,7 +192,8 @@ object RowSet {
private def toTColumnValue(
ordinal: Int,
row: Row,
types: StructType): TColumnValue = {
types: StructType,
timeFormatters: TimeFormatters): TColumnValue = {
types(ordinal).dataType match {
case BooleanType =>
val boolValue = new TBoolValue
@ -232,7 +241,9 @@ object RowSet {
case _ =>
val tStrValue = new TStringValue
if (!row.isNullAt(ordinal)) {
tStrValue.setValue(toHiveString(row.get(ordinal) -> types(ordinal).dataType))
tStrValue.setValue(toHiveString(
row.get(ordinal) -> types(ordinal).dataType,
timeFormatters = timeFormatters))
}
TColumnValue.stringVal(tStrValue)
}

View File

@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@ -105,17 +106,31 @@ object SparkDatasetHelper extends Logging {
val quotedCol = (name: String) => col(quoteIfNeeded(name))
// an udf to call `RowSet.toHiveString` on complex types(struct/array/map) and timestamp type.
// TODO: reuse the timeFormatters on greater scale if possible,
// recreating timeFormatters may cause performance issue, see [KYUUBI#5811]
val toHiveStringUDF = udf[String, Row, String]((row, schemaDDL) => {
val dt = DataType.fromDDL(schemaDDL)
dt match {
case StructType(Array(StructField(_, st: StructType, _, _))) =>
RowSet.toHiveString((row, st), nested = true)
RowSet.toHiveString(
(row, st),
nested = true,
timeFormatters = HiveResult.getTimeFormatters)
case StructType(Array(StructField(_, at: ArrayType, _, _))) =>
RowSet.toHiveString((row.toSeq.head, at), nested = true)
RowSet.toHiveString(
(row.toSeq.head, at),
nested = true,
timeFormatters = HiveResult.getTimeFormatters)
case StructType(Array(StructField(_, mt: MapType, _, _))) =>
RowSet.toHiveString((row.toSeq.head, mt), nested = true)
RowSet.toHiveString(
(row.toSeq.head, mt),
nested = true,
timeFormatters = HiveResult.getTimeFormatters)
case StructType(Array(StructField(_, tt: TimestampType, _, _))) =>
RowSet.toHiveString((row.toSeq.head, tt), nested = true)
RowSet.toHiveString(
(row.toSeq.head, tt),
nested = true,
timeFormatters = HiveResult.getTimeFormatters)
case _ =>
throw new UnsupportedOperationException
}

View File

@ -25,6 +25,7 @@ import java.time.{Instant, LocalDate}
import scala.collection.JavaConverters._
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
@ -165,14 +166,18 @@ class RowSetSuite extends KyuubiFunSuite {
dateCol.getValues.asScala.zipWithIndex.foreach {
case (b, 11) => assert(b === "NULL")
case (b, i) =>
assert(b === RowSet.toHiveString(Date.valueOf(s"2018-11-${i + 1}") -> DateType))
assert(b === RowSet.toHiveString(
Date.valueOf(s"2018-11-${i + 1}") -> DateType,
timeFormatters = HiveResult.getTimeFormatters))
}
val tsCol = cols.next().getStringVal
tsCol.getValues.asScala.zipWithIndex.foreach {
case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b ===
RowSet.toHiveString(Timestamp.valueOf(s"2018-11-17 13:33:33.$i") -> TimestampType))
RowSet.toHiveString(
Timestamp.valueOf(s"2018-11-17 13:33:33.$i") -> TimestampType,
timeFormatters = HiveResult.getTimeFormatters))
}
val binCol = cols.next().getBinaryVal
@ -185,14 +190,16 @@ class RowSetSuite extends KyuubiFunSuite {
arrCol.getValues.asScala.zipWithIndex.foreach {
case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b === RowSet.toHiveString(
Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq -> ArrayType(DoubleType)))
Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq -> ArrayType(DoubleType),
timeFormatters = HiveResult.getTimeFormatters))
}
val mapCol = cols.next().getStringVal
mapCol.getValues.asScala.zipWithIndex.foreach {
case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b === RowSet.toHiveString(
Map(i -> java.lang.Double.valueOf(s"$i.$i")) -> MapType(IntegerType, DoubleType)))
Map(i -> java.lang.Double.valueOf(s"$i.$i")) -> MapType(IntegerType, DoubleType),
timeFormatters = HiveResult.getTimeFormatters))
}
val intervalCol = cols.next().getStringVal
@ -241,7 +248,9 @@ class RowSetSuite extends KyuubiFunSuite {
val r8 = iter.next().getColVals
assert(r8.get(12).getStringVal.getValue === Array.fill(7)(7.7d).mkString("[", ",", "]"))
assert(r8.get(13).getStringVal.getValue ===
RowSet.toHiveString(Map(7 -> 7.7d) -> MapType(IntegerType, DoubleType)))
RowSet.toHiveString(
Map(7 -> 7.7d) -> MapType(IntegerType, DoubleType),
timeFormatters = HiveResult.getTimeFormatters))
val r9 = iter.next().getColVals
assert(r9.get(14).getStringVal.getValue === new CalendarInterval(8, 8, 8).toString)