From d227e616b2735f0a3201b642e16f17b38e60a80d Mon Sep 17 00:00:00 2001 From: QianChao Date: Fri, 25 Oct 2019 11:34:29 +0800 Subject: [PATCH] [KYUUBI-221] Optimise FetchResults speed when maxRows is large fix #221 Squashed commit of the following: commit f103b8d5dcbdb838b9dc1d46faec8e93c10af865 Author: QianChao Date: Fri Oct 25 10:14:30 2019 +0800 UPD: add more test cases for ColumnBaseSet commit eabd5d4341a0ceabce9f68e20fe173415ecae981 Author: QianChao Date: Thu Oct 17 15:16:40 2019 +0800 FIX: ColumnBaseSet nulls with stream issue commit 14c04820db2c6a22413e4b775ca71bae55d51aaf Author: QianChao Date: Wed Oct 16 15:04:22 2019 +0800 FIX: stream access issue --- .../kyuubi/schema/ColumnBasedSet.scala | 73 +++++++++--------- .../kyuubi/schema/ColumnBasedSetSuite.scala | 74 +++++++++++++++++++ 2 files changed, 113 insertions(+), 34 deletions(-) diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/schema/ColumnBasedSet.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/schema/ColumnBasedSet.scala index 0972752da..69188bc8d 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/schema/ColumnBasedSet.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/schema/ColumnBasedSet.scala @@ -32,79 +32,84 @@ case class ColumnBasedSet(types: StructType, rows: Seq[Row]) extends RowSet { override def toTRowSet: TRowSet = { val tRowSet = new TRowSet(0, Seq[TRow]().asJava) if (rows != null) { - (0 until types.length).map(i => toTColumn(i, types(i).dataType)).foreach(tRowSet.addToColumns) + // rows is Stream[Row], to make it not lazy + val rowListWithIndex = rows.zipWithIndex.toList + (0 until types.length) + .map(i => toTColumn(rowListWithIndex, i, types(i).dataType)) + .foreach(tRowSet.addToColumns) } tRowSet } - private[this] def toTColumn(ordinal: Int, typ: DataType): TColumn = { + private[this] def toTColumn( + rowListWithIndex: List[(Row, Int)], ordinal: Int, typ: DataType): TColumn = { val nulls = new BitSet() typ match { case BooleanType => - val values = rows.indices.map { i => - nulls.set(i, rows(i).isNullAt(ordinal)) - if (rows(i).isNullAt(ordinal)) true else rows(i).getBoolean(ordinal) + val values = rowListWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) true else row.getBoolean(ordinal) }.map(_.asInstanceOf[java.lang.Boolean]).asJava TColumn.boolVal(new TBoolColumn(values, bitSetToBuffer(nulls))) case ByteType => - val values = rows.indices.map { i => - nulls.set(i, rows(i).isNullAt(ordinal)) - if (rows(i).isNullAt(ordinal)) 0.toByte else rows(i).getByte(ordinal) + val values = rowListWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) 0.toByte else row.getByte(ordinal) }.map(_.asInstanceOf[java.lang.Byte]).asJava TColumn.byteVal(new TByteColumn(values, bitSetToBuffer(nulls))) case ShortType => - val values = rows.indices.map { i => - nulls.set(i, rows(i).isNullAt(ordinal)) - if (rows(i).isNullAt(ordinal)) 0.toShort else rows(i).getShort(ordinal) + val values = rowListWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) 0.toShort else row.getShort(ordinal) }.map(_.asInstanceOf[java.lang.Short]).asJava TColumn.i16Val(new TI16Column(values, bitSetToBuffer(nulls))) case IntegerType => - val values = rows.indices.map { i => - nulls.set(i, rows(i).isNullAt(ordinal)) - if (rows(i).isNullAt(ordinal)) 0 else rows(i).getInt(ordinal) + val values = rowListWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) 0 else row.getInt(ordinal) }.map(_.asInstanceOf[java.lang.Integer]).asJava TColumn.i32Val(new TI32Column(values, bitSetToBuffer(nulls))) case LongType => - val values = rows.indices.map { i => - nulls.set(i, rows(i).isNullAt(ordinal)) - if (rows(i).isNullAt(ordinal)) 0 else rows(i).getLong(ordinal) + val values = rowListWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) 0 else row.getLong(ordinal) }.map(_.asInstanceOf[java.lang.Long]).asJava TColumn.i64Val(new TI64Column(values, bitSetToBuffer(nulls))) case FloatType => - val values = rows.indices.map { i => - nulls.set(i, rows(i).isNullAt(ordinal)) - if (rows(i).isNullAt(ordinal)) 0 else rows(i).getFloat(ordinal) + val values = rowListWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) 0 else row.getFloat(ordinal) }.map(_.toDouble.asInstanceOf[java.lang.Double]).asJava TColumn.doubleVal(new TDoubleColumn(values, bitSetToBuffer(nulls))) case DoubleType => - val values = rows.indices.map { i => - nulls.set(i, rows(i).isNullAt(ordinal)) - if (rows(i).isNullAt(ordinal)) 0 else rows(i).getDouble(ordinal) + val values = rowListWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) 0 else row.getDouble(ordinal) }.map(_.asInstanceOf[java.lang.Double]).asJava TColumn.doubleVal(new TDoubleColumn(values, bitSetToBuffer(nulls))) case StringType => - val values = rows.indices.map { i => - nulls.set(i, rows(i).isNullAt(ordinal)) - if (rows(i).isNullAt(ordinal)) EMPTY_STRING else rows(i).getString(ordinal) + val values = rowListWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) EMPTY_STRING else row.getString(ordinal) }.asJava TColumn.stringVal(new TStringColumn(values, bitSetToBuffer(nulls))) case BinaryType => - val values = rows.indices.map { i => - nulls.set(i, rows(i).isNullAt(ordinal)) - if (rows(i).isNullAt(ordinal)) { + val values = rowListWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) { EMPTY_BINARY } else { - ByteBuffer.wrap(rows(i).getAs[Array[Byte]](ordinal)) + ByteBuffer.wrap(row.getAs[Array[Byte]](ordinal)) } }.asJava TColumn.binaryVal(new TBinaryColumn(values, bitSetToBuffer(nulls))) case _ => - val values = rows.indices.map { i => - nulls.set(i, rows(i).isNullAt(ordinal)) - if (rows(i).isNullAt(ordinal)) { + val values = rowListWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) { EMPTY_STRING } else { - SparkSQLUtils.toHiveString((rows(i).get(ordinal), typ)) + SparkSQLUtils.toHiveString((row.get(ordinal), typ)) } }.asJava TColumn.stringVal(new TStringColumn(values, bitSetToBuffer(nulls))) diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/schema/ColumnBasedSetSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/schema/ColumnBasedSetSuite.scala index 77501323d..58dc8f90b 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/schema/ColumnBasedSetSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/schema/ColumnBasedSetSuite.scala @@ -92,6 +92,80 @@ class ColumnBasedSetSuite extends SparkFunSuite { assert(listIter.isEmpty) } + test("row set null value test with iterator") { + val schema = new StructType() + .add("a", "int") + .add("b", "string") + .add("c", "boolean") + .add("d", "long") + .add("e", "short") + .add("f", "double") + val rows = Seq( + Row(1, "11", true, 1L, 1.toShort, 1.0), + Row(2, "22", null, 2L, 2.toShort, 2.0), + Row(3, null, false, null, 3.toShort, 3.0), + Row(4, "44", false, 4L, null, 4.0), + Row(5, null, true, null, null, null)) + + var iter = rows.toIterator.toSeq + var tRowSet = ColumnBasedSet(schema, iter).toTRowSet + + val columnA = tRowSet.getColumns.get(0).getI32Val.getValues + assert(columnA.get(0) === 1) + assert(columnA.get(1) === 2) + assert(columnA.get(2) === 3) + assert(columnA.get(3) === 4) + assert(columnA.get(4) === 5) + + val columnB = tRowSet.getColumns.get(1).getStringVal.getValues + assert(columnB.get(0) === "11") + assert(columnB.get(1) === "22") + assert(columnB.get(2) === "") + assert(columnB.get(3) === "44") + assert(columnB.get(4) === "") + + val columnC = tRowSet.getColumns.get(2).getBoolVal.getValues + assert(columnC.get(0) === true) + assert(columnC.get(1) === true) + assert(columnC.get(2) === false) + assert(columnC.get(3) === false) + assert(columnC.get(4) === true) + + val columnD = tRowSet.getColumns.get(3).getI64Val.getValues + assert(columnD.get(0) === 1L) + assert(columnD.get(1) === 2L) + assert(columnD.get(2) === 0L) + assert(columnD.get(3) === 4L) + assert(columnD.get(4) === 0L) + + val columnE = tRowSet.getColumns.get(4).getI16Val.getValues + assert(columnE.get(0) === 1.toShort) + assert(columnE.get(1) === 2.toShort) + assert(columnE.get(2) === 3.toShort) + assert(columnE.get(3) === 0.toShort) + assert(columnE.get(4) === 0.toShort) + + val columnF = tRowSet.getColumns.get(5).getDoubleVal.getValues + assert(columnE.get(0) === 1.0) + assert(columnE.get(1) === 2.0) + assert(columnE.get(2) === 3.0) + assert(columnE.get(3) === 0.toDouble) + assert(columnE.get(4) === 0.toDouble) + + // byteBuffer 00000 + assert(tRowSet.getColumns.get(0).getI32Val.getNulls === Array[Byte]()) + // byteBuffer 10100 + assert(tRowSet.getColumns.get(1).getStringVal.getNulls === Array[Byte](20)) + // byteBuffer 00010 + assert(tRowSet.getColumns.get(2).getBoolVal.getNulls === Array[Byte](2)) + // byteBuffer 10100 + assert(tRowSet.getColumns.get(3).getI64Val.getNulls === Array[Byte](20)) + // byteBuffer 11000 + assert(tRowSet.getColumns.get(4).getI16Val.getNulls === Array[Byte](24)) + // byteBuffer 10000 + assert(tRowSet.getColumns.get(5).getDoubleVal.getNulls === Array[Byte](16)) + } + test("kyuubi set to TRowSet then to Hive Row Set") { val rowIterator = rows.iterator val taken = rowIterator.take(maxRows).toSeq