diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/schema/RowBasedSet.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/schema/RowBasedSet.scala index b409f8e67..bc7c2678f 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/schema/RowBasedSet.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/schema/RowBasedSet.scala @@ -88,17 +88,20 @@ case class RowBasedSet(types: StructType, rows: Seq[Row]) extends RowSet { case DateType => val tStringValue = new TStringValue - if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getDate(ordinal).toString) + if (!row.isNullAt(ordinal)) tStringValue.setValue(row.get(ordinal).toString) TColumnValue.stringVal(tStringValue) case TimestampType => val tStringValue = new TStringValue - if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getTimestamp(ordinal).toString) + if (!row.isNullAt(ordinal)) tStringValue.setValue(row.get(ordinal).toString) TColumnValue.stringVal(tStringValue) case BinaryType => val tStringValue = new TStringValue - if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getAs[Array[Byte]](ordinal).toString) + if (!row.isNullAt(ordinal)) { + val bytes = row.getAs[Array[Byte]](ordinal) + tStringValue.setValue(SparkSQLUtils.toHiveString((bytes, types(ordinal).dataType))) + } TColumnValue.stringVal(tStringValue) case _: ArrayType | _: StructType | _: MapType => diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/schema/ColumnBasedSetSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/schema/ColumnBasedSetSuite.scala index 94cc02526..e7cb57956 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/schema/ColumnBasedSetSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/schema/ColumnBasedSetSuite.scala @@ -103,8 +103,6 @@ class ColumnBasedSetSuite extends SparkFunSuite { assert(hiveRowSet.getColumns.get(1).get(4).equals("55")) } - - test("get global row iterator with array to iterator") { def getNextRowSet(maxRowsL: Long, ifDrop: Boolean = false): RowSet = { diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/schema/RowBasedSetSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/schema/RowBasedSetSuite.scala index c1caf935f..6150ce055 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/schema/RowBasedSetSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/schema/RowBasedSetSuite.scala @@ -17,10 +17,17 @@ package yaooqinn.kyuubi.schema +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.sql.Timestamp +import java.util.BitSet + +import scala.collection.mutable + import org.apache.hive.service.cli import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.Row -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Row, SparkSQLUtils} +import org.apache.spark.sql.types.{BinaryType, DateType, StructType, TimestampType} class RowBasedSetSuite extends SparkFunSuite { val maxRows: Int = 5 @@ -140,4 +147,168 @@ class RowBasedSetSuite extends SparkFunSuite { assert(row5.next().asInstanceOf[Int] === 5) assert(row5.next().equals("55")) } + + test("bool type null value tests") { + val schema1 = new StructType().add("c1", "boolean") + val rows = Seq(Row(null), Row(true), Row(false), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(!tRowSet.getRows.get(0).getColVals.get(0).getBoolVal.isSetValue) + assert(tRowSet.getRows.get(1).getColVals.get(0).getBoolVal.isSetValue) + assert(tRowSet.getRows.get(2).getColVals.get(0).getBoolVal.isSetValue) + assert(!tRowSet.getRows.get(3).getColVals.get(0).getBoolVal.isSetValue) + } + + test("byte type null value tests") { + val schema1 = new StructType().add("c1", "byte") + val rows = Seq(Row(null), Row(1.toByte), Row(2.toByte), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(tRowSet.getRows.get(0).getColVals.get(0).getByteVal.getValue === 0) + assert(tRowSet.getRows.get(1).getColVals.get(0).getByteVal.getValue === 1) + assert(tRowSet.getRows.get(2).getColVals.get(0).getByteVal.getValue === 2) + assert(tRowSet.getRows.get(3).getColVals.get(0).getByteVal.getValue === 0) + } + + test("short type null value tests") { + val schema1 = new StructType().add("c1", "short") + val rows = Seq(Row(null), Row(1.toShort), Row(2.toShort), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(tRowSet.getRows.get(0).getColVals.get(0).getI16Val.getValue === 0) + assert(tRowSet.getRows.get(1).getColVals.get(0).getI16Val.getValue === 1) + assert(tRowSet.getRows.get(2).getColVals.get(0).getI16Val.getValue === 2) + assert(tRowSet.getRows.get(3).getColVals.get(0).getI16Val.getValue === 0) + } + + test("int type null value tests") { + val schema1 = new StructType().add("c1", "int") + val rows = Seq(Row(null), Row(1), Row(2), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(tRowSet.getRows.get(0).getColVals.get(0).getI32Val.getValue === 0) + assert(tRowSet.getRows.get(1).getColVals.get(0).getI32Val.getValue === 1) + assert(tRowSet.getRows.get(2).getColVals.get(0).getI32Val.getValue === 2) + assert(tRowSet.getRows.get(3).getColVals.get(0).getI32Val.getValue === 0) + } + + test("long type null value tests") { + val schema1 = new StructType().add("c1", "long") + val rows = Seq(Row(null), Row(1L), Row(2L), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(tRowSet.getRows.get(0).getColVals.get(0).getI64Val.getValue === 0) + assert(tRowSet.getRows.get(1).getColVals.get(0).getI64Val.getValue === 1) + assert(tRowSet.getRows.get(2).getColVals.get(0).getI64Val.getValue === 2) + assert(tRowSet.getRows.get(3).getColVals.get(0).getI64Val.getValue === 0) + } + + test("float type null value tests") { + val schema1 = new StructType().add("c1", "float") + val rows = Seq(Row(null), Row(1.0f), Row(2.0f), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(tRowSet.getRows.get(0).getColVals.get(0).getDoubleVal.getValue === 0) + assert(tRowSet.getRows.get(1).getColVals.get(0).getDoubleVal.getValue === 1) + assert(tRowSet.getRows.get(2).getColVals.get(0).getDoubleVal.getValue === 2) + assert(tRowSet.getRows.get(3).getColVals.get(0).getDoubleVal.getValue === 0) + } + + test("double type null value tests") { + val schema1 = new StructType().add("c1", "double") + val rows = Seq(Row(null), Row(1.0d), Row(2.0d), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(tRowSet.getRows.get(0).getColVals.get(0).getDoubleVal.getValue === 0) + assert(tRowSet.getRows.get(1).getColVals.get(0).getDoubleVal.getValue === 1) + assert(tRowSet.getRows.get(2).getColVals.get(0).getDoubleVal.getValue === 2) + assert(tRowSet.getRows.get(3).getColVals.get(0).getDoubleVal.getValue === 0) + } + + test("string type null value tests") { + val schema1 = new StructType().add("c1", "string") + val rows = Seq(Row(null), Row("a"), Row(""), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(tRowSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === null) + assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue === "a") + assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue === "") + assert(tRowSet.getRows.get(3).getColVals.get(0).getStringVal.getValue === null) + } + + test("binary type null value tests") { + val schema1 = new StructType().add("c1", BinaryType) + val v1 = Array(1.toByte) + val v2 = Array(2.toByte) + val rows = Seq(Row(null), Row(v1), Row(v2), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue === + SparkSQLUtils.toHiveString((v1, BinaryType))) + assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue === + SparkSQLUtils.toHiveString((v2, BinaryType))) + } + + test("date type null value tests") { + val schema1 = new StructType().add("c1", DateType) + val v1 = "2018-03-12" + val v2 = "2010-03-13" + val rows = Seq(Row(null), Row(v1), Row(v2), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(tRowSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === null) + assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue === v1) + assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue === v2) + } + + test("timestamp type null value tests") { + val schema1 = new StructType().add("c1", TimestampType) + val v1 = new Timestamp(System.currentTimeMillis()) + val v2 = "2018-03-30 17:59:59" + val rows = Seq(Row(null), Row(v1), Row(v2), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(tRowSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === null) + assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue === v1.toString) + assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue === v2) + } + + test("decimal type null value tests") { + val schema1 = new StructType().add("c1", "decimal(5, 1)") + val v1 = new java.math.BigDecimal("100.12") + val v2 = new java.math.BigDecimal("200.34") + val rows = Seq(Row(null), Row(v1), Row(v2), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(tRowSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === null) + assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue === v1.toString) + assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue === v2.toString) + } + + test("array type null value tests") { + val schema1 = new StructType().add("c1", "array") + val v1 = mutable.WrappedArray.make(Array(1, 2)) + val v2 = mutable.WrappedArray.make(Array(3, 4)) + val rows = Seq(Row(null), Row(v1), Row(v2), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(tRowSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === null) + assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue === + SparkSQLUtils.toHiveString((v1, schema1.head.dataType))) + assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue === + SparkSQLUtils.toHiveString((v2, schema1.head.dataType))) + } + + test("map type null value tests") { + val schema1 = new StructType().add("c1", "map>") + val v1 = Map(1 -> mutable.WrappedArray.make(Array(1, 2))) + val v2 = Map(2 -> mutable.WrappedArray.make(Array(3, 4))) + val rows = Seq(Row(null), Row(v1), Row(v2), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(tRowSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === null) + assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue === + SparkSQLUtils.toHiveString((v1, schema1.head.dataType))) + assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue === + SparkSQLUtils.toHiveString((v2, schema1.head.dataType))) + } + + test("ss type null value tests") { + val schema1 = new StructType().add("c1", "struct") + val v1 = Row(1, "12") + val v2 = Row(2, "22") + val rows = Seq(Row(null), Row(v1), Row(v2), Row(null)) + val tRowSet = RowBasedSet(schema1, rows).toTRowSet + assert(tRowSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === null) + assert(tRowSet.getRows.get(1).getColVals.get(0).getStringVal.getValue === + SparkSQLUtils.toHiveString((v1, schema1.head.dataType))) + assert(tRowSet.getRows.get(2).getColVals.get(0).getStringVal.getValue === + SparkSQLUtils.toHiveString((v2, schema1.head.dataType))) + } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala index b4247c9e2..ff746bcb5 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil -import yaooqinn.kyuubi.service.ServiceException +import yaooqinn.kyuubi.service.{ServiceException, State} import yaooqinn.kyuubi.utils.ReflectUtils class KyuubiServerSuite extends SparkFunSuite { @@ -77,13 +77,19 @@ class KyuubiServerSuite extends SparkFunSuite { KyuubiServer.setupCommonConfig(conf) val server = new KyuubiServer() server.init(conf) + assert(server.getServiceState === State.INITED) assert(server.feService !== null) assert(server.beService !== null) assert(server.beService.getSessionManager !== null) assert(server.beService.getSessionManager.getOperationMgr !== null) + assert(server.getStartTime === 0) server.start() + assert(server.getServices.nonEmpty) + assert(server.getStartTime !== 0) + assert(server.getServiceState === State.STARTED) assert(ReflectUtils.getFieldValue(server, "started").asInstanceOf[AtomicBoolean].get) server.stop() + assert(server.getServiceState === State.STOPPED) assert(!ReflectUtils.getFieldValue(server, "started").asInstanceOf[AtomicBoolean].get) }