[KYUUBI-221] Optimise FetchResults speed when maxRows is large fix #221

Squashed commit of the following:

commit f103b8d5dcbdb838b9dc1d46faec8e93c10af865
Author: QianChao <chaoqian@advance.ai>
Date:   Fri Oct 25 10:14:30 2019 +0800

    UPD: add more test cases for ColumnBaseSet

commit eabd5d4341a0ceabce9f68e20fe173415ecae981
Author: QianChao <chaoqian@advance.ai>
Date:   Thu Oct 17 15:16:40 2019 +0800

    FIX: ColumnBaseSet nulls with stream issue

commit 14c04820db2c6a22413e4b775ca71bae55d51aaf
Author: QianChao <chaoqian@advance.ai>
Date:   Wed Oct 16 15:04:22 2019 +0800

    FIX: stream access issue
This commit is contained in:
QianChao 2019-10-25 11:34:29 +08:00 committed by Kent Yao
parent c6ef20089e
commit d227e616b2
2 changed files with 113 additions and 34 deletions

View File

@ -32,79 +32,84 @@ case class ColumnBasedSet(types: StructType, rows: Seq[Row]) extends RowSet {
override def toTRowSet: TRowSet = {
val tRowSet = new TRowSet(0, Seq[TRow]().asJava)
if (rows != null) {
(0 until types.length).map(i => toTColumn(i, types(i).dataType)).foreach(tRowSet.addToColumns)
// rows is Stream[Row], to make it not lazy
val rowListWithIndex = rows.zipWithIndex.toList
(0 until types.length)
.map(i => toTColumn(rowListWithIndex, i, types(i).dataType))
.foreach(tRowSet.addToColumns)
}
tRowSet
}
private[this] def toTColumn(ordinal: Int, typ: DataType): TColumn = {
private[this] def toTColumn(
rowListWithIndex: List[(Row, Int)], ordinal: Int, typ: DataType): TColumn = {
val nulls = new BitSet()
typ match {
case BooleanType =>
val values = rows.indices.map { i =>
nulls.set(i, rows(i).isNullAt(ordinal))
if (rows(i).isNullAt(ordinal)) true else rows(i).getBoolean(ordinal)
val values = rowListWithIndex.map { case (row, i) =>
nulls.set(i, row.isNullAt(ordinal))
if (row.isNullAt(ordinal)) true else row.getBoolean(ordinal)
}.map(_.asInstanceOf[java.lang.Boolean]).asJava
TColumn.boolVal(new TBoolColumn(values, bitSetToBuffer(nulls)))
case ByteType =>
val values = rows.indices.map { i =>
nulls.set(i, rows(i).isNullAt(ordinal))
if (rows(i).isNullAt(ordinal)) 0.toByte else rows(i).getByte(ordinal)
val values = rowListWithIndex.map { case (row, i) =>
nulls.set(i, row.isNullAt(ordinal))
if (row.isNullAt(ordinal)) 0.toByte else row.getByte(ordinal)
}.map(_.asInstanceOf[java.lang.Byte]).asJava
TColumn.byteVal(new TByteColumn(values, bitSetToBuffer(nulls)))
case ShortType =>
val values = rows.indices.map { i =>
nulls.set(i, rows(i).isNullAt(ordinal))
if (rows(i).isNullAt(ordinal)) 0.toShort else rows(i).getShort(ordinal)
val values = rowListWithIndex.map { case (row, i) =>
nulls.set(i, row.isNullAt(ordinal))
if (row.isNullAt(ordinal)) 0.toShort else row.getShort(ordinal)
}.map(_.asInstanceOf[java.lang.Short]).asJava
TColumn.i16Val(new TI16Column(values, bitSetToBuffer(nulls)))
case IntegerType =>
val values = rows.indices.map { i =>
nulls.set(i, rows(i).isNullAt(ordinal))
if (rows(i).isNullAt(ordinal)) 0 else rows(i).getInt(ordinal)
val values = rowListWithIndex.map { case (row, i) =>
nulls.set(i, row.isNullAt(ordinal))
if (row.isNullAt(ordinal)) 0 else row.getInt(ordinal)
}.map(_.asInstanceOf[java.lang.Integer]).asJava
TColumn.i32Val(new TI32Column(values, bitSetToBuffer(nulls)))
case LongType =>
val values = rows.indices.map { i =>
nulls.set(i, rows(i).isNullAt(ordinal))
if (rows(i).isNullAt(ordinal)) 0 else rows(i).getLong(ordinal)
val values = rowListWithIndex.map { case (row, i) =>
nulls.set(i, row.isNullAt(ordinal))
if (row.isNullAt(ordinal)) 0 else row.getLong(ordinal)
}.map(_.asInstanceOf[java.lang.Long]).asJava
TColumn.i64Val(new TI64Column(values, bitSetToBuffer(nulls)))
case FloatType =>
val values = rows.indices.map { i =>
nulls.set(i, rows(i).isNullAt(ordinal))
if (rows(i).isNullAt(ordinal)) 0 else rows(i).getFloat(ordinal)
val values = rowListWithIndex.map { case (row, i) =>
nulls.set(i, row.isNullAt(ordinal))
if (row.isNullAt(ordinal)) 0 else row.getFloat(ordinal)
}.map(_.toDouble.asInstanceOf[java.lang.Double]).asJava
TColumn.doubleVal(new TDoubleColumn(values, bitSetToBuffer(nulls)))
case DoubleType =>
val values = rows.indices.map { i =>
nulls.set(i, rows(i).isNullAt(ordinal))
if (rows(i).isNullAt(ordinal)) 0 else rows(i).getDouble(ordinal)
val values = rowListWithIndex.map { case (row, i) =>
nulls.set(i, row.isNullAt(ordinal))
if (row.isNullAt(ordinal)) 0 else row.getDouble(ordinal)
}.map(_.asInstanceOf[java.lang.Double]).asJava
TColumn.doubleVal(new TDoubleColumn(values, bitSetToBuffer(nulls)))
case StringType =>
val values = rows.indices.map { i =>
nulls.set(i, rows(i).isNullAt(ordinal))
if (rows(i).isNullAt(ordinal)) EMPTY_STRING else rows(i).getString(ordinal)
val values = rowListWithIndex.map { case (row, i) =>
nulls.set(i, row.isNullAt(ordinal))
if (row.isNullAt(ordinal)) EMPTY_STRING else row.getString(ordinal)
}.asJava
TColumn.stringVal(new TStringColumn(values, bitSetToBuffer(nulls)))
case BinaryType =>
val values = rows.indices.map { i =>
nulls.set(i, rows(i).isNullAt(ordinal))
if (rows(i).isNullAt(ordinal)) {
val values = rowListWithIndex.map { case (row, i) =>
nulls.set(i, row.isNullAt(ordinal))
if (row.isNullAt(ordinal)) {
EMPTY_BINARY
} else {
ByteBuffer.wrap(rows(i).getAs[Array[Byte]](ordinal))
ByteBuffer.wrap(row.getAs[Array[Byte]](ordinal))
}
}.asJava
TColumn.binaryVal(new TBinaryColumn(values, bitSetToBuffer(nulls)))
case _ =>
val values = rows.indices.map { i =>
nulls.set(i, rows(i).isNullAt(ordinal))
if (rows(i).isNullAt(ordinal)) {
val values = rowListWithIndex.map { case (row, i) =>
nulls.set(i, row.isNullAt(ordinal))
if (row.isNullAt(ordinal)) {
EMPTY_STRING
} else {
SparkSQLUtils.toHiveString((rows(i).get(ordinal), typ))
SparkSQLUtils.toHiveString((row.get(ordinal), typ))
}
}.asJava
TColumn.stringVal(new TStringColumn(values, bitSetToBuffer(nulls)))

View File

@ -92,6 +92,80 @@ class ColumnBasedSetSuite extends SparkFunSuite {
assert(listIter.isEmpty)
}
test("row set null value test with iterator") {
val schema = new StructType()
.add("a", "int")
.add("b", "string")
.add("c", "boolean")
.add("d", "long")
.add("e", "short")
.add("f", "double")
val rows = Seq(
Row(1, "11", true, 1L, 1.toShort, 1.0),
Row(2, "22", null, 2L, 2.toShort, 2.0),
Row(3, null, false, null, 3.toShort, 3.0),
Row(4, "44", false, 4L, null, 4.0),
Row(5, null, true, null, null, null))
var iter = rows.toIterator.toSeq
var tRowSet = ColumnBasedSet(schema, iter).toTRowSet
val columnA = tRowSet.getColumns.get(0).getI32Val.getValues
assert(columnA.get(0) === 1)
assert(columnA.get(1) === 2)
assert(columnA.get(2) === 3)
assert(columnA.get(3) === 4)
assert(columnA.get(4) === 5)
val columnB = tRowSet.getColumns.get(1).getStringVal.getValues
assert(columnB.get(0) === "11")
assert(columnB.get(1) === "22")
assert(columnB.get(2) === "")
assert(columnB.get(3) === "44")
assert(columnB.get(4) === "")
val columnC = tRowSet.getColumns.get(2).getBoolVal.getValues
assert(columnC.get(0) === true)
assert(columnC.get(1) === true)
assert(columnC.get(2) === false)
assert(columnC.get(3) === false)
assert(columnC.get(4) === true)
val columnD = tRowSet.getColumns.get(3).getI64Val.getValues
assert(columnD.get(0) === 1L)
assert(columnD.get(1) === 2L)
assert(columnD.get(2) === 0L)
assert(columnD.get(3) === 4L)
assert(columnD.get(4) === 0L)
val columnE = tRowSet.getColumns.get(4).getI16Val.getValues
assert(columnE.get(0) === 1.toShort)
assert(columnE.get(1) === 2.toShort)
assert(columnE.get(2) === 3.toShort)
assert(columnE.get(3) === 0.toShort)
assert(columnE.get(4) === 0.toShort)
val columnF = tRowSet.getColumns.get(5).getDoubleVal.getValues
assert(columnE.get(0) === 1.0)
assert(columnE.get(1) === 2.0)
assert(columnE.get(2) === 3.0)
assert(columnE.get(3) === 0.toDouble)
assert(columnE.get(4) === 0.toDouble)
// byteBuffer 00000
assert(tRowSet.getColumns.get(0).getI32Val.getNulls === Array[Byte]())
// byteBuffer 10100
assert(tRowSet.getColumns.get(1).getStringVal.getNulls === Array[Byte](20))
// byteBuffer 00010
assert(tRowSet.getColumns.get(2).getBoolVal.getNulls === Array[Byte](2))
// byteBuffer 10100
assert(tRowSet.getColumns.get(3).getI64Val.getNulls === Array[Byte](20))
// byteBuffer 11000
assert(tRowSet.getColumns.get(4).getI16Val.getNulls === Array[Byte](24))
// byteBuffer 10000
assert(tRowSet.getColumns.get(5).getDoubleVal.getNulls === Array[Byte](16))
}
test("kyuubi set to TRowSet then to Hive Row Set") {
val rowIterator = rows.iterator
val taken = rowIterator.take(maxRows).toSeq