This commit is contained in:
Kent Yao 2018-06-24 04:06:35 +08:00
parent f165165254
commit 921b481b4d
4 changed files with 186 additions and 8 deletions

View File

@ -88,17 +88,20 @@ case class RowBasedSet(types: StructType, rows: Seq[Row]) extends RowSet {
case DateType =>
val tStringValue = new TStringValue
if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getDate(ordinal).toString)
if (!row.isNullAt(ordinal)) tStringValue.setValue(row.get(ordinal).toString)
TColumnValue.stringVal(tStringValue)
case TimestampType =>
val tStringValue = new TStringValue
if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getTimestamp(ordinal).toString)
if (!row.isNullAt(ordinal)) tStringValue.setValue(row.get(ordinal).toString)
TColumnValue.stringVal(tStringValue)
case BinaryType =>
val tStringValue = new TStringValue
if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getAs[Array[Byte]](ordinal).toString)
if (!row.isNullAt(ordinal)) {
val bytes = row.getAs[Array[Byte]](ordinal)
tStringValue.setValue(SparkSQLUtils.toHiveString((bytes, types(ordinal).dataType)))
}
TColumnValue.stringVal(tStringValue)
case _: ArrayType | _: StructType | _: MapType =>

View File

@ -103,8 +103,6 @@ class ColumnBasedSetSuite extends SparkFunSuite {
assert(hiveRowSet.getColumns.get(1).get(4).equals("55"))
}
test("get global row iterator with array to iterator") {
def getNextRowSet(maxRowsL: Long, ifDrop: Boolean = false): RowSet = {

View File

@ -17,10 +17,17 @@
package yaooqinn.kyuubi.schema
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.sql.Timestamp
import java.util.BitSet
import scala.collection.mutable
import org.apache.hive.service.cli
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SparkSQLUtils}
import org.apache.spark.sql.types.{BinaryType, DateType, StructType, TimestampType}
class RowBasedSetSuite extends SparkFunSuite {
val maxRows: Int = 5
@ -140,4 +147,168 @@ class RowBasedSetSuite extends SparkFunSuite {
assert(row5.next().asInstanceOf[Int] === 5)
assert(row5.next().equals("55"))
}
test("bool type null value tests") {
val schema1 = new StructType().add("c1", "boolean")
val rows = Seq(Row(null), Row(true), Row(false), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(!tRowSet.getRows.get(0).getColVals.get(0).getBoolVal.isSetValue)
assert(tRowSet.getRows.get(1).getColVals.get(0).getBoolVal.isSetValue)
assert(tRowSet.getRows.get(2).getColVals.get(0).getBoolVal.isSetValue)
assert(!tRowSet.getRows.get(3).getColVals.get(0).getBoolVal.isSetValue)
}
test("byte type null value tests") {
val schema1 = new StructType().add("c1", "byte")
val rows = Seq(Row(null), Row(1.toByte), Row(2.toByte), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(tRowSet.getRows.get(0).getColVals.get(0).getByteVal.getValue === 0)
assert(tRowSet.getRows.get(1).getColVals.get(0).getByteVal.getValue === 1)
assert(tRowSet.getRows.get(2).getColVals.get(0).getByteVal.getValue === 2)
assert(tRowSet.getRows.get(3).getColVals.get(0).getByteVal.getValue === 0)
}
test("short type null value tests") {
val schema1 = new StructType().add("c1", "short")
val rows = Seq(Row(null), Row(1.toShort), Row(2.toShort), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(tRowSet.getRows.get(0).getColVals.get(0).getI16Val.getValue === 0)
assert(tRowSet.getRows.get(1).getColVals.get(0).getI16Val.getValue === 1)
assert(tRowSet.getRows.get(2).getColVals.get(0).getI16Val.getValue === 2)
assert(tRowSet.getRows.get(3).getColVals.get(0).getI16Val.getValue === 0)
}
test("int type null value tests") {
val schema1 = new StructType().add("c1", "int")
val rows = Seq(Row(null), Row(1), Row(2), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(tRowSet.getRows.get(0).getColVals.get(0).getI32Val.getValue === 0)
assert(tRowSet.getRows.get(1).getColVals.get(0).getI32Val.getValue === 1)
assert(tRowSet.getRows.get(2).getColVals.get(0).getI32Val.getValue === 2)
assert(tRowSet.getRows.get(3).getColVals.get(0).getI32Val.getValue === 0)
}
test("long type null value tests") {
val schema1 = new StructType().add("c1", "long")
val rows = Seq(Row(null), Row(1L), Row(2L), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(tRowSet.getRows.get(0).getColVals.get(0).getI64Val.getValue === 0)
assert(tRowSet.getRows.get(1).getColVals.get(0).getI64Val.getValue === 1)
assert(tRowSet.getRows.get(2).getColVals.get(0).getI64Val.getValue === 2)
assert(tRowSet.getRows.get(3).getColVals.get(0).getI64Val.getValue === 0)
}
test("float type null value tests") {
val schema1 = new StructType().add("c1", "float")
val rows = Seq(Row(null), Row(1.0f), Row(2.0f), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(tRowSet.getRows.get(0).getColVals.get(0).getDoubleVal.getValue === 0)
assert(tRowSet.getRows.get(1).getColVals.get(0).getDoubleVal.getValue === 1)
assert(tRowSet.getRows.get(2).getColVals.get(0).getDoubleVal.getValue === 2)
assert(tRowSet.getRows.get(3).getColVals.get(0).getDoubleVal.getValue === 0)
}
test("double type null value tests") {
val schema1 = new StructType().add("c1", "double")
val rows = Seq(Row(null), Row(1.0d), Row(2.0d), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(tRowSet.getRows.get(0).getColVals.get(0).getDoubleVal.getValue === 0)
assert(tRowSet.getRows.get(1).getColVals.get(0).getDoubleVal.getValue === 1)
assert(tRowSet.getRows.get(2).getColVals.get(0).getDoubleVal.getValue === 2)
assert(tRowSet.getRows.get(3).getColVals.get(0).getDoubleVal.getValue === 0)
}
test("string type null value tests") {
val schema1 = new StructType().add("c1", "string")
val rows = Seq(Row(null), Row("a"), Row(""), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(tRowSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === null)
assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue === "a")
assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue === "")
assert(tRowSet.getRows.get(3).getColVals.get(0).getStringVal.getValue === null)
}
test("binary type null value tests") {
val schema1 = new StructType().add("c1", BinaryType)
val v1 = Array(1.toByte)
val v2 = Array(2.toByte)
val rows = Seq(Row(null), Row(v1), Row(v2), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue ===
SparkSQLUtils.toHiveString((v1, BinaryType)))
assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue ===
SparkSQLUtils.toHiveString((v2, BinaryType)))
}
test("date type null value tests") {
val schema1 = new StructType().add("c1", DateType)
val v1 = "2018-03-12"
val v2 = "2010-03-13"
val rows = Seq(Row(null), Row(v1), Row(v2), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(tRowSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === null)
assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue === v1)
assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue === v2)
}
test("timestamp type null value tests") {
val schema1 = new StructType().add("c1", TimestampType)
val v1 = new Timestamp(System.currentTimeMillis())
val v2 = "2018-03-30 17:59:59"
val rows = Seq(Row(null), Row(v1), Row(v2), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(tRowSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === null)
assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue === v1.toString)
assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue === v2)
}
test("decimal type null value tests") {
val schema1 = new StructType().add("c1", "decimal(5, 1)")
val v1 = new java.math.BigDecimal("100.12")
val v2 = new java.math.BigDecimal("200.34")
val rows = Seq(Row(null), Row(v1), Row(v2), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(tRowSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === null)
assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue === v1.toString)
assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue === v2.toString)
}
test("array type null value tests") {
val schema1 = new StructType().add("c1", "array<int>")
val v1 = mutable.WrappedArray.make(Array(1, 2))
val v2 = mutable.WrappedArray.make(Array(3, 4))
val rows = Seq(Row(null), Row(v1), Row(v2), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(tRowSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === null)
assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue ===
SparkSQLUtils.toHiveString((v1, schema1.head.dataType)))
assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue ===
SparkSQLUtils.toHiveString((v2, schema1.head.dataType)))
}
test("map type null value tests") {
val schema1 = new StructType().add("c1", "map<int, array<string>>")
val v1 = Map(1 -> mutable.WrappedArray.make(Array(1, 2)))
val v2 = Map(2 -> mutable.WrappedArray.make(Array(3, 4)))
val rows = Seq(Row(null), Row(v1), Row(v2), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(tRowSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === null)
assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue ===
SparkSQLUtils.toHiveString((v1, schema1.head.dataType)))
assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue ===
SparkSQLUtils.toHiveString((v2, schema1.head.dataType)))
}
test("ss type null value tests") {
val schema1 = new StructType().add("c1", "struct<a: int, b:string>")
val v1 = Row(1, "12")
val v2 = Row(2, "22")
val rows = Seq(Row(null), Row(v1), Row(v2), Row(null))
val tRowSet = RowBasedSet(schema1, rows).toTRowSet
assert(tRowSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === null)
assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue ===
SparkSQLUtils.toHiveString((v1, schema1.head.dataType)))
assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue ===
SparkSQLUtils.toHiveString((v2, schema1.head.dataType)))
}
}

View File

@ -25,7 +25,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import yaooqinn.kyuubi.service.ServiceException
import yaooqinn.kyuubi.service.{ServiceException, State}
import yaooqinn.kyuubi.utils.ReflectUtils
class KyuubiServerSuite extends SparkFunSuite {
@ -77,13 +77,19 @@ class KyuubiServerSuite extends SparkFunSuite {
KyuubiServer.setupCommonConfig(conf)
val server = new KyuubiServer()
server.init(conf)
assert(server.getServiceState === State.INITED)
assert(server.feService !== null)
assert(server.beService !== null)
assert(server.beService.getSessionManager !== null)
assert(server.beService.getSessionManager.getOperationMgr !== null)
assert(server.getStartTime === 0)
server.start()
assert(server.getServices.nonEmpty)
assert(server.getStartTime !== 0)
assert(server.getServiceState === State.STARTED)
assert(ReflectUtils.getFieldValue(server, "started").asInstanceOf[AtomicBoolean].get)
server.stop()
assert(server.getServiceState === State.STOPPED)
assert(!ReflectUtils.getFieldValue(server, "started").asInstanceOf[AtomicBoolean].get)
}