fixes #39 column based set not handle null values properly
This commit is contained in:
parent
42b2d86b08
commit
de4f4d128d
@ -1,24 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package yaooqinn.kyuubi
|
||||
|
||||
class KyuubiExecption(message: String, cause: Throwable)
|
||||
extends Exception(message, cause) {
|
||||
|
||||
def this(message: String) = this(message, null)
|
||||
}
|
||||
@ -37,7 +37,7 @@ import org.apache.spark.KyuubiConf._
|
||||
import org.apache.thrift.TProcessorFactory
|
||||
import org.apache.thrift.transport.{TServerSocket, TTransportException, TTransportFactory}
|
||||
|
||||
import yaooqinn.kyuubi.{KyuubiExecption, KyuubiSQLException}
|
||||
import yaooqinn.kyuubi.{KyuubiException, KyuubiSQLException}
|
||||
|
||||
/**
|
||||
* Authentication
|
||||
@ -65,7 +65,7 @@ class KyuubiAuthFactory(conf: SparkConf) {
|
||||
}
|
||||
Some(server)
|
||||
case NONE.name => None
|
||||
case other => throw new KyuubiExecption("Unsupported authentication method: " + other)
|
||||
case other => throw new KyuubiException("Unsupported authentication method: " + other)
|
||||
}
|
||||
|
||||
private[this] def getSaslProperties: Map[String, String] = {
|
||||
|
||||
@ -156,7 +156,8 @@ private[kyuubi] class OperationManager private(name: String)
|
||||
// get the OperationLog object from the operation
|
||||
val opLog: OperationLog = getOperation(opHandle).getOperationLog
|
||||
if (opLog == null) {
|
||||
throw new KyuubiSQLException("Couldn't find log associated with operation handle: " + opHandle)
|
||||
throw new KyuubiSQLException(
|
||||
"Couldn't find log associated with operation handle: " + opHandle)
|
||||
}
|
||||
try {
|
||||
// convert logs to RowBasedSet
|
||||
|
||||
@ -39,88 +39,80 @@ case class ColumnBasedSet(types: StructType, rows: Seq[Row]) extends RowSet {
|
||||
val nulls = new BitSet()
|
||||
typ match {
|
||||
case BooleanType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) true else row.getBoolean(ordinal)
|
||||
val values = rows.indices.map { i =>
|
||||
nulls.set(i, rows(i).isNullAt(ordinal))
|
||||
if (rows(i).isNullAt(ordinal)) true else rows(i).getBoolean(ordinal)
|
||||
}.map(_.asInstanceOf[java.lang.Boolean]).asJava
|
||||
TColumn.boolVal(new TBoolColumn(values, bitSetToBuffer(nulls)))
|
||||
case ByteType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) 0 else row.getByte(ordinal)
|
||||
val values = rows.indices.map { i =>
|
||||
nulls.set(i, rows(i).isNullAt(ordinal))
|
||||
if (rows(i).isNullAt(ordinal)) 0.toByte else rows(i).getByte(ordinal)
|
||||
}.map(_.asInstanceOf[java.lang.Byte]).asJava
|
||||
TColumn.byteVal(new TByteColumn(values, bitSetToBuffer(nulls)))
|
||||
case ShortType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) 0 else row.getShort(ordinal)
|
||||
val values = rows.indices.map { i =>
|
||||
nulls.set(i, rows(i).isNullAt(ordinal))
|
||||
if (rows(i).isNullAt(ordinal)) 0.toShort else rows(i).getShort(ordinal)
|
||||
}.map(_.asInstanceOf[java.lang.Short]).asJava
|
||||
TColumn.i16Val(new TI16Column(values, bitSetToBuffer(nulls)))
|
||||
case IntegerType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) 0 else row.getInt(ordinal)
|
||||
val values = rows.indices.map { i =>
|
||||
nulls.set(i, rows(i).isNullAt(ordinal))
|
||||
if (rows(i).isNullAt(ordinal)) 0 else rows(i).getInt(ordinal)
|
||||
}.map(_.asInstanceOf[java.lang.Integer]).asJava
|
||||
TColumn.i32Val(new TI32Column(values, bitSetToBuffer(nulls)))
|
||||
case LongType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) 0 else row.getLong(ordinal)
|
||||
val values = rows.indices.map { i =>
|
||||
nulls.set(i, rows(i).isNullAt(ordinal))
|
||||
if (rows(i).isNullAt(ordinal)) 0 else rows(i).getLong(ordinal)
|
||||
}.map(_.asInstanceOf[java.lang.Long]).asJava
|
||||
TColumn.i64Val(new TI64Column(values, bitSetToBuffer(nulls)))
|
||||
case FloatType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) 0 else row.getFloat(ordinal)
|
||||
}.map(_.asInstanceOf[java.lang.Double]).asJava
|
||||
val values = rows.indices.map { i =>
|
||||
nulls.set(i, rows(i).isNullAt(ordinal))
|
||||
if (rows(i).isNullAt(ordinal)) 0 else rows(i).getFloat(ordinal)
|
||||
}.map(_.toDouble.asInstanceOf[java.lang.Double]).asJava
|
||||
TColumn.doubleVal(new TDoubleColumn(values, bitSetToBuffer(nulls)))
|
||||
case DoubleType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) 0 else row.getDouble(ordinal)
|
||||
val values = rows.indices.map { i =>
|
||||
nulls.set(i, rows(i).isNullAt(ordinal))
|
||||
if (rows(i).isNullAt(ordinal)) 0 else rows(i).getDouble(ordinal)
|
||||
}.map(_.asInstanceOf[java.lang.Double]).asJava
|
||||
TColumn.doubleVal(new TDoubleColumn(values, bitSetToBuffer(nulls)))
|
||||
case StringType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) EMPTY_STRING else row.getString(ordinal)
|
||||
val values = rows.indices.map { i =>
|
||||
nulls.set(i, rows(i).isNullAt(ordinal))
|
||||
if (rows(i).isNullAt(ordinal)) EMPTY_STRING else rows(i).getString(ordinal)
|
||||
}.asJava
|
||||
TColumn.stringVal(new TStringColumn(values, bitSetToBuffer(nulls)))
|
||||
case BinaryType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) {
|
||||
val values = rows.indices.map { i =>
|
||||
nulls.set(i, rows(i).isNullAt(ordinal))
|
||||
if (rows(i).isNullAt(ordinal)) {
|
||||
EMPTY_BINARY
|
||||
} else {
|
||||
ByteBuffer.wrap(row.getAs[Array[Byte]](ordinal))
|
||||
ByteBuffer.wrap(rows(i).getAs[Array[Byte]](ordinal))
|
||||
}
|
||||
}.asJava
|
||||
TColumn.binaryVal(new TBinaryColumn(values, bitSetToBuffer(nulls)))
|
||||
case _ =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) {
|
||||
val values = rows.indices.map { i =>
|
||||
nulls.set(i, rows(i).isNullAt(ordinal))
|
||||
if (rows(i).isNullAt(ordinal)) {
|
||||
EMPTY_STRING
|
||||
} else {
|
||||
SparkSQLUtils.toHiveString((row.get(ordinal), typ))
|
||||
SparkSQLUtils.toHiveString((rows(i).get(ordinal), typ))
|
||||
}
|
||||
}.asJava
|
||||
TColumn.stringVal(new TStringColumn(values, bitSetToBuffer(nulls)))
|
||||
}
|
||||
}
|
||||
|
||||
private[this] def bitSetToBuffer(bitSet: BitSet): ByteBuffer = ByteBuffer.wrap(bitSet.toByteArray)
|
||||
}
|
||||
|
||||
object ColumnBasedSet {
|
||||
private val EMPTY_STRING = ""
|
||||
private val EMPTY_BINARY = ByteBuffer.allocate(0)
|
||||
private val MASKS = Array[Byte](0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80.toByte)
|
||||
|
||||
private def bitSetToBuffer(bitset: BitSet): ByteBuffer = {
|
||||
val nulls = new Array[Byte](1 + (bitset.length / 8))
|
||||
(0 until bitset.length).foreach { i =>
|
||||
nulls(i / 8) = (nulls(i / 8) | (if (bitset.get(i)) MASKS(i % 8) else 0.toByte)).toByte
|
||||
}
|
||||
ByteBuffer.wrap(nulls)
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,10 +17,16 @@
|
||||
|
||||
package yaooqinn.kyuubi.schema
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import java.sql.Timestamp
|
||||
import java.util.BitSet
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
import org.apache.hive.service.cli.thrift.TProtocolVersion
|
||||
import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.sql.{Row, SparkSQLUtils}
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
class ColumnBasedSetSuite extends SparkFunSuite {
|
||||
val maxRows: Int = 5
|
||||
@ -126,4 +132,252 @@ class ColumnBasedSetSuite extends SparkFunSuite {
|
||||
tRowSet = getNextRowSet(5, ifDrop = true).toTRowSet
|
||||
assert(tRowSet.getColumns.get(0).getI32Val.getValues.get(0).intValue() === 6)
|
||||
}
|
||||
|
||||
test("bool type null value tests") {
|
||||
val schema1 = new StructType().add("c1", "boolean")
|
||||
val rows = Seq(Row(null), Row(true), Row(false), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getBoolVal.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getBoolVal.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getBoolVal.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getBoolVal.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getBoolVal.getValues.get(0).booleanValue())
|
||||
assert(tRowSet.getColumns.get(0).getBoolVal.getValues.get(1).booleanValue())
|
||||
assert(!tRowSet.getColumns.get(0).getBoolVal.getValues.get(2).booleanValue())
|
||||
assert(tRowSet.getColumns.get(0).getBoolVal.getValues.get(3).booleanValue())
|
||||
}
|
||||
|
||||
test("byte type null value tests") {
|
||||
val schema1 = new StructType().add("c1", "byte")
|
||||
val rows = Seq(Row(null), Row(1.toByte), Row(2.toByte), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getByteVal.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getByteVal.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getByteVal.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getByteVal.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getByteVal.getValues.get(0).byteValue() === 0)
|
||||
assert(tRowSet.getColumns.get(0).getByteVal.getValues.get(1).byteValue() === 1)
|
||||
assert(tRowSet.getColumns.get(0).getByteVal.getValues.get(2).byteValue() === 2)
|
||||
assert(tRowSet.getColumns.get(0).getByteVal.getValues.get(3).byteValue() === 0)
|
||||
}
|
||||
|
||||
test("short type null value tests") {
|
||||
val schema1 = new StructType().add("c1", "short")
|
||||
val rows = Seq(Row(null), Row(1.toShort), Row(2.toShort), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getI16Val.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getI16Val.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getI16Val.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getI16Val.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getI16Val.getValues.get(0).byteValue() === 0)
|
||||
assert(tRowSet.getColumns.get(0).getI16Val.getValues.get(1).byteValue() === 1)
|
||||
assert(tRowSet.getColumns.get(0).getI16Val.getValues.get(2).byteValue() === 2)
|
||||
assert(tRowSet.getColumns.get(0).getI16Val.getValues.get(3).byteValue() === 0)
|
||||
}
|
||||
|
||||
test("int type null value tests") {
|
||||
val schema1 = new StructType().add("c1", "int")
|
||||
val rows = Seq(Row(null), Row(1), Row(2), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getI32Val.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getI32Val.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getI32Val.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getI32Val.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getI32Val.getValues.get(0).byteValue() === 0)
|
||||
assert(tRowSet.getColumns.get(0).getI32Val.getValues.get(1).byteValue() === 1)
|
||||
assert(tRowSet.getColumns.get(0).getI32Val.getValues.get(2).byteValue() === 2)
|
||||
assert(tRowSet.getColumns.get(0).getI32Val.getValues.get(3).byteValue() === 0)
|
||||
}
|
||||
|
||||
test("long type null value tests") {
|
||||
val schema1 = new StructType().add("c1", "long")
|
||||
val rows = Seq(Row(null), Row(1L), Row(2L), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getI64Val.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getI64Val.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getI64Val.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getI64Val.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getI64Val.getValues.get(0).byteValue() === 0)
|
||||
assert(tRowSet.getColumns.get(0).getI64Val.getValues.get(1).byteValue() === 1)
|
||||
assert(tRowSet.getColumns.get(0).getI64Val.getValues.get(2).byteValue() === 2)
|
||||
assert(tRowSet.getColumns.get(0).getI64Val.getValues.get(3).byteValue() === 0)
|
||||
}
|
||||
|
||||
test("float type null value tests") {
|
||||
val schema1 = new StructType().add("c1", "float")
|
||||
val rows = Seq(Row(null), Row(1.0f), Row(2.0f), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getDoubleVal.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getDoubleVal.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getDoubleVal.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getDoubleVal.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getDoubleVal.getValues.get(0).byteValue() === 0)
|
||||
assert(tRowSet.getColumns.get(0).getDoubleVal.getValues.get(1).byteValue() === 1)
|
||||
assert(tRowSet.getColumns.get(0).getDoubleVal.getValues.get(2).byteValue() === 2)
|
||||
assert(tRowSet.getColumns.get(0).getDoubleVal.getValues.get(3).byteValue() === 0)
|
||||
}
|
||||
|
||||
test("double type null value tests") {
|
||||
val schema1 = new StructType().add("c1", "double")
|
||||
val rows = Seq(Row(null), Row(1.0d), Row(2.0d), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getDoubleVal.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getDoubleVal.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getDoubleVal.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getDoubleVal.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getDoubleVal.getValues.get(0).byteValue() === 0)
|
||||
assert(tRowSet.getColumns.get(0).getDoubleVal.getValues.get(1).byteValue() === 1)
|
||||
assert(tRowSet.getColumns.get(0).getDoubleVal.getValues.get(2).byteValue() === 2)
|
||||
assert(tRowSet.getColumns.get(0).getDoubleVal.getValues.get(3).byteValue() === 0)
|
||||
}
|
||||
|
||||
test("string type null value tests") {
|
||||
val schema1 = new StructType().add("c1", "string")
|
||||
val rows = Seq(Row(null), Row("a"), Row(""), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(0) === "")
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(1) === "a")
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(2) === "")
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(3) === "")
|
||||
}
|
||||
|
||||
test("binary type null value tests") {
|
||||
val schema1 = new StructType().add("c1", BinaryType)
|
||||
val v1 = Array(1.toByte)
|
||||
val v2 = Array(2.toByte)
|
||||
val rows = Seq(Row(null), Row(v1), Row(v2), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getBinaryVal.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getBinaryVal.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getBinaryVal.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getBinaryVal.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getBinaryVal.getValues.get(0) === ByteBuffer.allocate(0))
|
||||
assert(tRowSet.getColumns.get(0).getBinaryVal.getValues.get(1) === ByteBuffer.wrap(v1))
|
||||
assert(tRowSet.getColumns.get(0).getBinaryVal.getValues.get(2) === ByteBuffer.wrap(v2))
|
||||
assert(tRowSet.getColumns.get(0).getBinaryVal.getValues.get(3) === ByteBuffer.allocate(0))
|
||||
}
|
||||
|
||||
test("date type null value tests") {
|
||||
val schema1 = new StructType().add("c1", DateType)
|
||||
val v1 = "2018-03-12"
|
||||
val v2 = "2010-03-13"
|
||||
val rows = Seq(Row(null), Row(v1), Row(v2), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(0) === "")
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(1) === v1)
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(2) === v2)
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(3) === "")
|
||||
}
|
||||
|
||||
test("timestamp type null value tests") {
|
||||
val schema1 = new StructType().add("c1", TimestampType)
|
||||
val v1 = new Timestamp(System.currentTimeMillis())
|
||||
val v2 = "2018-03-30 17:59:59"
|
||||
val rows = Seq(Row(null), Row(v1), Row(v2), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(0) === "")
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(1) === v1.toString)
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(2) === v2)
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(3) === "")
|
||||
}
|
||||
|
||||
test("decimal type null value tests") {
|
||||
val schema1 = new StructType().add("c1", "decimal(5, 1)")
|
||||
val v1 = new java.math.BigDecimal("100.12")
|
||||
val v2 = new java.math.BigDecimal("200.34")
|
||||
val rows = Seq(Row(null), Row(v1), Row(v2), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(0) === "")
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(1) ===
|
||||
SparkSQLUtils.toHiveString((v1, schema1.head.dataType)))
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(2) ===
|
||||
SparkSQLUtils.toHiveString((v2, schema1.head.dataType)))
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(3) === "")
|
||||
}
|
||||
|
||||
test("array type null value tests") {
|
||||
val schema1 = new StructType().add("c1", "array<int>")
|
||||
val v1 = mutable.WrappedArray.make(Array(1, 2))
|
||||
val v2 = mutable.WrappedArray.make(Array(3, 4))
|
||||
val rows = Seq(Row(null), Row(v1), Row(v2), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(0) === "")
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(1) ===
|
||||
SparkSQLUtils.toHiveString((v1, schema1.head.dataType)))
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(2) ===
|
||||
SparkSQLUtils.toHiveString((v2, schema1.head.dataType)))
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(3) === "")
|
||||
}
|
||||
|
||||
test("map type null value tests") {
|
||||
val schema1 = new StructType().add("c1", "map<int, array<string>>")
|
||||
val v1 = Map(1 -> mutable.WrappedArray.make(Array(1, 2)))
|
||||
val v2 = Map(2 -> mutable.WrappedArray.make(Array(3, 4)))
|
||||
val rows = Seq(Row(null), Row(v1), Row(v2), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(0) === "")
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(1) ===
|
||||
SparkSQLUtils.toHiveString((v1, schema1.head.dataType)))
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(2) ===
|
||||
SparkSQLUtils.toHiveString((v2, schema1.head.dataType)))
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(3) === "")
|
||||
}
|
||||
|
||||
test("ss type null value tests") {
|
||||
val schema1 = new StructType().add("c1", "struct<a: int, b:string>")
|
||||
val v1 = Row(1, "12")
|
||||
val v2 = Row(2, "22")
|
||||
val rows = Seq(Row(null), Row(v1), Row(v2), Row(null))
|
||||
val tRowSet = ColumnBasedSet(schema1, rows).toTRowSet
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(0))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(1))
|
||||
assert(!BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(2))
|
||||
assert(BitSet.valueOf(tRowSet.getColumns.get(0).getStringVal.getNulls).get(3))
|
||||
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(0) === "")
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(1) ===
|
||||
SparkSQLUtils.toHiveString((v1, schema1.head.dataType)))
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(2) ===
|
||||
SparkSQLUtils.toHiveString((v2, schema1.head.dataType)))
|
||||
assert(tRowSet.getColumns.get(0).getStringVal.getValues.get(3) === "")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user