From e4e682f1fc7535a570dfcd5ef72802dfd40a26c3 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 26 Mar 2018 15:55:55 +0800 Subject: [PATCH] column based set support --- .../kyuubi/operation/KyuubiOperation.scala | 20 +-- .../kyuubi/operation/OperationManager.scala | 6 +- .../scala/yaooqinn/kyuubi/schema/Column.scala | 22 +++ .../kyuubi/schema/ColumnBasedSet.scala | 126 ++++++++++++++++++ .../yaooqinn/kyuubi/schema/RowBasedSet.scala | 112 ++++++++++++++++ .../scala/yaooqinn/kyuubi/schema/RowSet.scala | 95 +------------ .../kyuubi/schema/RowSetBuilder.scala | 34 +++++ .../kyuubi/session/KyuubiSession.scala | 2 +- ...wSetSuite.scala => RowBasedSetSuite.scala} | 17 ++- 9 files changed, 314 insertions(+), 120 deletions(-) create mode 100644 src/main/scala/yaooqinn/kyuubi/schema/Column.scala create mode 100644 src/main/scala/yaooqinn/kyuubi/schema/ColumnBasedSet.scala create mode 100644 src/main/scala/yaooqinn/kyuubi/schema/RowBasedSet.scala create mode 100644 src/main/scala/yaooqinn/kyuubi/schema/RowSetBuilder.scala rename src/test/scala/yaooqinn/kyuubi/schema/{RowSetSuite.scala => RowBasedSetSuite.scala} (92%) diff --git a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 327b4165e..6320f6253 100644 --- a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -19,12 +19,10 @@ package yaooqinn.kyuubi.operation import java.io.{File, FileNotFoundException} import java.security.PrivilegedExceptionAction -import java.sql.{Date, Timestamp} import java.util.UUID import java.util.concurrent.{Future, RejectedExecutionException} import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import org.apache.hadoop.hive.conf.HiveConf @@ -32,15 +30,15 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControl import org.apache.hadoop.hive.ql.session.OperationLog import org.apache.hive.service.cli._ import org.apache.hive.service.cli.thrift.TProtocolVersion -import org.apache.spark.KyuubiConf._ import org.apache.spark.SparkUtils -import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils} +import org.apache.spark.KyuubiConf._ +import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.types._ import yaooqinn.kyuubi.Logging import yaooqinn.kyuubi.cli.FetchOrientation -import yaooqinn.kyuubi.schema.RowSet +import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder} import yaooqinn.kyuubi.session.KyuubiSession import yaooqinn.kyuubi.ui.KyuubiServerMonitor @@ -231,7 +229,8 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } // if not wrap doas ,it will cause NPE while proxy on session.ugi.doAs(new PrivilegedExceptionAction[RowSet] { - override def run(): RowSet = RowSet(getResultSetSchema, taken.toSeq) + override def run(): RowSet = + RowSetBuilder.create(getResultSetSchema, taken.toSeq, session.getProtocolVersion) }) } @@ -243,10 +242,6 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging /** * Verify if the given fetch orientation is part of the default orientation types. - * - * @param orientation - * - * @throws HiveSQLException */ @throws[HiveSQLException] private[this] def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = { @@ -255,11 +250,6 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging /** * Verify if the given fetch orientation is part of the supported orientation types. - * - * @param orientation - * @param supportedOrientations - * - * @throws HiveSQLException */ @throws[HiveSQLException] private[this] def validateFetchOrientation( diff --git a/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala b/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala index a24b6ed4e..ee6648b22 100644 --- a/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala +++ b/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.StructType import yaooqinn.kyuubi.Logging import yaooqinn.kyuubi.cli.FetchOrientation -import yaooqinn.kyuubi.schema.RowSet +import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder} import yaooqinn.kyuubi.service.AbstractService import yaooqinn.kyuubi.session.KyuubiSession @@ -157,9 +157,9 @@ private[kyuubi] class OperationManager private(name: String) throw new HiveSQLException("Couldn't find log associated with operation handle: " + opHandle) } try { - // convert logs to RowSet + // convert logs to RowBasedSet val logs = opLog.readOperationLog(isFetchFirst(orientation), maxRows).asScala.map(Row(_)) - RowSet(logSchema, logs) + RowSetBuilder.create(logSchema, logs, opHandle.getProtocolVersion) } catch { case e: SQLException => throw new HiveSQLException(e.getMessage, e.getCause) diff --git a/src/main/scala/yaooqinn/kyuubi/schema/Column.scala b/src/main/scala/yaooqinn/kyuubi/schema/Column.scala new file mode 100644 index 000000000..212f708e4 --- /dev/null +++ b/src/main/scala/yaooqinn/kyuubi/schema/Column.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.schema + +class Column { + +} diff --git a/src/main/scala/yaooqinn/kyuubi/schema/ColumnBasedSet.scala b/src/main/scala/yaooqinn/kyuubi/schema/ColumnBasedSet.scala new file mode 100644 index 000000000..45c53055e --- /dev/null +++ b/src/main/scala/yaooqinn/kyuubi/schema/ColumnBasedSet.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.schema + +import java.nio.ByteBuffer +import java.util.BitSet + +import scala.collection.JavaConverters._ + +import org.apache.hive.service.cli.thrift._ +import org.apache.spark.sql.{Row, SparkSQLUtils} +import org.apache.spark.sql.types.{BinaryType, _} + +case class ColumnBasedSet(types: StructType, rows: Seq[Row]) extends RowSet { + import ColumnBasedSet._ + + override def toTRowSet: TRowSet = { + val tRowSet = new TRowSet(0, Seq[TRow]().asJava) + (0 until types.length).map(i => toTColumn(i, types(i).dataType)).foreach(tRowSet.addToColumns) + tRowSet + } + + private[this] def toTColumn(ordinal: Int, typ: DataType): TColumn = { + val nulls = new BitSet() + typ match { + case BooleanType => + val values = rows.zipWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) true else row.getBoolean(ordinal) + }.map(_.asInstanceOf[java.lang.Boolean]).asJava + TColumn.boolVal(new TBoolColumn(values, bitSetToBuffer(nulls))) + case ByteType => + val values = rows.zipWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) 0 else row.getByte(ordinal) + }.map(_.asInstanceOf[java.lang.Byte]).asJava + TColumn.byteVal(new TByteColumn(values, bitSetToBuffer(nulls))) + case ShortType => + val values = rows.zipWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) 0 else row.getShort(ordinal) + }.map(_.asInstanceOf[java.lang.Short]).asJava + TColumn.i16Val(new TI16Column(values, bitSetToBuffer(nulls))) + case IntegerType => + val values = rows.zipWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) 0 else row.getInt(ordinal) + }.map(_.asInstanceOf[java.lang.Integer]).asJava + TColumn.i32Val(new TI32Column(values, bitSetToBuffer(nulls))) + case LongType => + val values = rows.zipWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) 0 else row.getLong(ordinal) + }.map(_.asInstanceOf[java.lang.Long]).asJava + TColumn.i64Val(new TI64Column(values, bitSetToBuffer(nulls))) + case FloatType => + val values = rows.zipWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) 0 else row.getFloat(ordinal) + }.map(_.asInstanceOf[java.lang.Double]).asJava + TColumn.doubleVal(new TDoubleColumn(values, bitSetToBuffer(nulls))) + case DoubleType => + val values = rows.zipWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) 0 else row.getDouble(ordinal) + }.map(_.asInstanceOf[java.lang.Double]).asJava + TColumn.doubleVal(new TDoubleColumn(values, bitSetToBuffer(nulls))) + case StringType => + val values = rows.zipWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) EMPTY_STRING else row.getString(ordinal) + }.asJava + TColumn.stringVal(new TStringColumn(values, bitSetToBuffer(nulls))) + case BinaryType => + val values = rows.zipWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) { + EMPTY_BINARY + } else { + ByteBuffer.wrap(row.getAs[Array[Byte]](ordinal)) + } + }.asJava + TColumn.binaryVal(new TBinaryColumn(values, bitSetToBuffer(nulls))) + case _ => + val values = rows.zipWithIndex.map { case (row, i) => + nulls.set(i, row.isNullAt(ordinal)) + if (row.isNullAt(ordinal)) { + EMPTY_STRING + } else { + SparkSQLUtils.toHiveString((row.get(ordinal), typ)) + } + }.asJava + TColumn.stringVal(new TStringColumn(values, bitSetToBuffer(nulls))) + } + } + +} + +object ColumnBasedSet { + private val EMPTY_STRING = "" + private val EMPTY_BINARY = ByteBuffer.allocate(0) + private val MASKS = Array[Byte](0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80.toByte) + + private def bitSetToBuffer(bitset: BitSet): ByteBuffer = { + val nulls = new Array[Byte](1 + (bitset.length / 8)) + (0 until bitset.length).foreach { i => + nulls(i / 8) = (nulls(i / 8) | (if (bitset.get(i)) MASKS(i % 8) else 0.toByte)).toByte + } + ByteBuffer.wrap(nulls) + } +} diff --git a/src/main/scala/yaooqinn/kyuubi/schema/RowBasedSet.scala b/src/main/scala/yaooqinn/kyuubi/schema/RowBasedSet.scala new file mode 100644 index 000000000..b409f8e67 --- /dev/null +++ b/src/main/scala/yaooqinn/kyuubi/schema/RowBasedSet.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.schema + +import scala.collection.JavaConverters._ + +import org.apache.hive.service.cli.thrift._ +import org.apache.spark.sql.{Row, SparkSQLUtils} +import org.apache.spark.sql.types._ + +/** + * A result set of Spark's [[Row]]s with its [[StructType]] as its schema, with the ability of + * transform to [[TRowSet]]. + */ +case class RowBasedSet(types: StructType, rows: Seq[Row]) extends RowSet { + + override def toTRowSet: TRowSet = new TRowSet(0, toTRows(rows).asJava) + + private[this] def toTRows(rows: Seq[Row]): Seq[TRow] = rows.map(toTRow) + + private[this] def toTRow(row: Row): TRow = { + val tRow = new TRow() + (0 until row.length).map(i => toTColumnValue(i, row)).foreach(tRow.addToColVals) + tRow + } + + private[this] def toTColumnValue(ordinal: Int, row: Row): TColumnValue = + types(ordinal).dataType match { + case BooleanType => + val boolValue = new TBoolValue + if (!row.isNullAt(ordinal)) boolValue.setValue(row.getBoolean(ordinal)) + TColumnValue.boolVal(boolValue) + + case ByteType => + val byteValue = new TByteValue + if (!row.isNullAt(ordinal)) byteValue.setValue(row.getByte(ordinal)) + TColumnValue.byteVal(byteValue) + + case ShortType => + val tI16Value = new TI16Value + if (!row.isNullAt(ordinal)) tI16Value.setValue(row.getShort(ordinal)) + TColumnValue.i16Val(tI16Value) + + case IntegerType => + val tI32Value = new TI32Value + if (!row.isNullAt(ordinal)) tI32Value.setValue(row.getInt(ordinal)) + TColumnValue.i32Val(tI32Value) + + case LongType => + val tI64Value = new TI64Value + if (!row.isNullAt(ordinal)) tI64Value.setValue(row.getLong(ordinal)) + TColumnValue.i64Val(tI64Value) + + case FloatType => + val tDoubleValue = new TDoubleValue + if (!row.isNullAt(ordinal)) tDoubleValue.setValue(row.getFloat(ordinal)) + TColumnValue.doubleVal(tDoubleValue) + + case DoubleType => + val tDoubleValue = new TDoubleValue + if (!row.isNullAt(ordinal)) tDoubleValue.setValue(row.getDouble(ordinal)) + TColumnValue.doubleVal(tDoubleValue) + + case StringType => + val tStringValue = new TStringValue + if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getString(ordinal)) + TColumnValue.stringVal(tStringValue) + + case DecimalType() => + val tStrValue = new TStringValue + if (!row.isNullAt(ordinal)) tStrValue.setValue(row.getDecimal(ordinal).toString) + TColumnValue.stringVal(tStrValue) + + case DateType => + val tStringValue = new TStringValue + if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getDate(ordinal).toString) + TColumnValue.stringVal(tStringValue) + + case TimestampType => + val tStringValue = new TStringValue + if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getTimestamp(ordinal).toString) + TColumnValue.stringVal(tStringValue) + + case BinaryType => + val tStringValue = new TStringValue + if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getAs[Array[Byte]](ordinal).toString) + TColumnValue.stringVal(tStringValue) + + case _: ArrayType | _: StructType | _: MapType => + val tStrValue = new TStringValue + if (!row.isNullAt(ordinal)) { + tStrValue.setValue( + SparkSQLUtils.toHiveString((row.get(ordinal), types(ordinal).dataType))) + } + TColumnValue.stringVal(tStrValue) + } +} diff --git a/src/main/scala/yaooqinn/kyuubi/schema/RowSet.scala b/src/main/scala/yaooqinn/kyuubi/schema/RowSet.scala index ef48d668c..bb29cc2ed 100644 --- a/src/main/scala/yaooqinn/kyuubi/schema/RowSet.scala +++ b/src/main/scala/yaooqinn/kyuubi/schema/RowSet.scala @@ -17,97 +17,8 @@ package yaooqinn.kyuubi.schema -import scala.collection.JavaConverters._ +import org.apache.hive.service.cli.thrift.TRowSet -import org.apache.hive.service.cli.thrift._ -import org.apache.spark.sql.{Row, SparkSQLUtils} -import org.apache.spark.sql.types._ - -/** - * A result set of Spark's [[Row]]s with its [[StructType]] as its schema, with the ability of - * transform to [[TRowSet]]. - */ -case class RowSet(types: StructType, rows: Seq[Row]) { - - def toTRowSet: TRowSet = new TRowSet(0, toTRows(rows).asJava) - - private[this] def toTRows(rows: Seq[Row]): Seq[TRow] = rows.map(toTRow) - - - private[this] def toTRow(row: Row): TRow = { - val tRow = new TRow() - (0 until row.length).map(i => toTColumnValue(i, row)).foreach(tRow.addToColVals) - tRow - } - - private[this] def toTColumnValue(ordinal: Int, row: Row): TColumnValue = - types(ordinal).dataType match { - case BooleanType => - val boolValue = new TBoolValue - if (!row.isNullAt(ordinal)) boolValue.setValue(row.getBoolean(ordinal)) - TColumnValue.boolVal(boolValue) - - case ByteType => - val byteValue = new TByteValue - if (!row.isNullAt(ordinal)) byteValue.setValue(row.getByte(ordinal)) - TColumnValue.byteVal(byteValue) - - case ShortType => - val tI16Value = new TI16Value - if (!row.isNullAt(ordinal)) tI16Value.setValue(row.getShort(ordinal)) - TColumnValue.i16Val(tI16Value) - - case IntegerType => - val tI32Value = new TI32Value - if (!row.isNullAt(ordinal)) tI32Value.setValue(row.getInt(ordinal)) - TColumnValue.i32Val(tI32Value) - - case LongType => - val tI64Value = new TI64Value - if (!row.isNullAt(ordinal)) tI64Value.setValue(row.getLong(ordinal)) - TColumnValue.i64Val(tI64Value) - - case FloatType => - val tDoubleValue = new TDoubleValue - if (!row.isNullAt(ordinal)) tDoubleValue.setValue(row.getFloat(ordinal)) - TColumnValue.doubleVal(tDoubleValue) - - case DoubleType => - val tDoubleValue = new TDoubleValue - if (!row.isNullAt(ordinal)) tDoubleValue.setValue(row.getDouble(ordinal)) - TColumnValue.doubleVal(tDoubleValue) - - case StringType => - val tStringValue = new TStringValue - if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getString(ordinal)) - TColumnValue.stringVal(tStringValue) - - case DecimalType() => - val tStrValue = new TStringValue - if (!row.isNullAt(ordinal)) tStrValue.setValue(row.getDecimal(ordinal).toString) - TColumnValue.stringVal(tStrValue) - - case DateType => - val tStringValue = new TStringValue - if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getDate(ordinal).toString) - TColumnValue.stringVal(tStringValue) - - case TimestampType => - val tStringValue = new TStringValue - if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getTimestamp(ordinal).toString) - TColumnValue.stringVal(tStringValue) - - case BinaryType => - val tStringValue = new TStringValue - if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getAs[Array[Byte]](ordinal).toString) - TColumnValue.stringVal(tStringValue) - - case _: ArrayType | _: StructType | _: MapType => - val tStrValue = new TStringValue - if (!row.isNullAt(ordinal)) { - tStrValue.setValue( - SparkSQLUtils.toHiveString((row.get(ordinal), types(ordinal).dataType))) - } - TColumnValue.stringVal(tStrValue) - } +trait RowSet { + def toTRowSet: TRowSet } diff --git a/src/main/scala/yaooqinn/kyuubi/schema/RowSetBuilder.scala b/src/main/scala/yaooqinn/kyuubi/schema/RowSetBuilder.scala new file mode 100644 index 000000000..7faaf034a --- /dev/null +++ b/src/main/scala/yaooqinn/kyuubi/schema/RowSetBuilder.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.schema + +import org.apache.hive.service.cli.thrift.TProtocolVersion +import org.apache.hive.service.cli.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6 +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.StructType + +object RowSetBuilder { + + def create(types: StructType, rows: Seq[Row], version: TProtocolVersion): RowSet = { + if (version.getValue >= HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) { + ColumnBasedSet(types, rows) + } else { + RowBasedSet(types, rows) + } + } +} diff --git a/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala b/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala index d535a8365..6f1c74e18 100644 --- a/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala +++ b/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala @@ -47,7 +47,7 @@ import yaooqinn.kyuubi.Logging import yaooqinn.kyuubi.auth.KyuubiAuthFactory import yaooqinn.kyuubi.cli._ import yaooqinn.kyuubi.operation.{KyuubiOperation, OperationHandle, OperationManager} -import yaooqinn.kyuubi.schema.RowSet +import yaooqinn.kyuubi.schema.{RowBasedSet, RowSet} import yaooqinn.kyuubi.ui.{KyuubiServerListener, KyuubiServerMonitor} import yaooqinn.kyuubi.utils.{HadoopUtils, ReflectUtils} diff --git a/src/test/scala/yaooqinn/kyuubi/schema/RowSetSuite.scala b/src/test/scala/yaooqinn/kyuubi/schema/RowBasedSetSuite.scala similarity index 92% rename from src/test/scala/yaooqinn/kyuubi/schema/RowSetSuite.scala rename to src/test/scala/yaooqinn/kyuubi/schema/RowBasedSetSuite.scala index f71d1d934..8fcad1db5 100644 --- a/src/test/scala/yaooqinn/kyuubi/schema/RowSetSuite.scala +++ b/src/test/scala/yaooqinn/kyuubi/schema/RowBasedSetSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType -class RowSetSuite extends SparkFunSuite { +class RowBasedSetSuite extends SparkFunSuite { test("row set basic suites") { val maxRows: Int = 5 @@ -49,7 +49,7 @@ class RowSetSuite extends SparkFunSuite { // fetch next val rowIterator = rows.iterator var taken = rowIterator.take(maxRows).toSeq - var tRowSet = RowSet(schema, taken).toTRowSet + var tRowSet = RowBasedSet(schema, taken).toTRowSet assert(tRowSet.getRowsSize === 5) assert(tRowSet.getRows.get(0).getColVals.get(1).getStringVal.getValue === "11") assert(tRowSet.getRows.get(1).getColVals.get(1).getStringVal.getValue === "22") @@ -58,7 +58,7 @@ class RowSetSuite extends SparkFunSuite { assert(tRowSet.getRows.get(4).getColVals.get(1).getStringVal.getValue === "55") taken = rowIterator.take(maxRows).toSeq - tRowSet = RowSet(schema, taken).toTRowSet + tRowSet = RowBasedSet(schema, taken).toTRowSet assert(tRowSet.getRowsSize === 5) assert(tRowSet.getRows.get(0).getColVals.get(1).getStringVal.getValue === "66") assert(tRowSet.getRows.get(1).getColVals.get(1).getStringVal.getValue === "77") @@ -67,7 +67,7 @@ class RowSetSuite extends SparkFunSuite { assert(tRowSet.getRows.get(4).getColVals.get(1).getStringVal.getValue === "000") taken = rowIterator.take(maxRows).toSeq - tRowSet = RowSet(schema, taken).toTRowSet + tRowSet = RowBasedSet(schema, taken).toTRowSet assert(tRowSet.getRowsSize === 5) assert(tRowSet.getRows.get(0).getColVals.get(1).getStringVal.getValue === "111") assert(tRowSet.getRows.get(1).getColVals.get(1).getStringVal.getValue === "222") @@ -76,7 +76,7 @@ class RowSetSuite extends SparkFunSuite { assert(tRowSet.getRows.get(4).getColVals.get(1).getStringVal.getValue === "555") taken = rowIterator.take(maxRows).toSeq - tRowSet = RowSet(schema, taken).toTRowSet + tRowSet = RowBasedSet(schema, taken).toTRowSet assert(tRowSet.getRowsSize === 1) assert(tRowSet.getRows.get(0).getColVals.get(1).getStringVal.getValue === "666") intercept[IndexOutOfBoundsException](tRowSet.getRows.get(1)) @@ -90,7 +90,7 @@ class RowSetSuite extends SparkFunSuite { val resultList = itr2.toList taken = itr1.toSeq - tRowSet = RowSet(schema, taken).toTRowSet + tRowSet = RowBasedSet(schema, taken).toTRowSet assert(tRowSet.getRowsSize === 5) assert(tRowSet.getRows.get(0).getColVals.get(1).getStringVal.getValue === "11") assert(tRowSet.getRows.get(1).getColVals.get(1).getStringVal.getValue === "22") @@ -99,7 +99,7 @@ class RowSetSuite extends SparkFunSuite { assert(tRowSet.getRows.get(4).getColVals.get(1).getStringVal.getValue === "55") taken = resultList - tRowSet = RowSet(schema, taken).toTRowSet + tRowSet = RowBasedSet(schema, taken).toTRowSet assert(tRowSet.getRowsSize === 5) assert(tRowSet.getRows.get(0).getColVals.get(1).getStringVal.getValue === "11") assert(tRowSet.getRows.get(1).getColVals.get(1).getStringVal.getValue === "22") @@ -108,7 +108,7 @@ class RowSetSuite extends SparkFunSuite { assert(tRowSet.getRows.get(4).getColVals.get(1).getStringVal.getValue === "55") taken = resultList - tRowSet = RowSet(schema, taken).toTRowSet + tRowSet = RowBasedSet(schema, taken).toTRowSet assert(tRowSet.getRowsSize === 5) assert(tRowSet.getRows.get(0).getColVals.get(1).getStringVal.getValue === "11") assert(tRowSet.getRows.get(1).getColVals.get(1).getStringVal.getValue === "22") @@ -116,5 +116,4 @@ class RowSetSuite extends SparkFunSuite { assert(tRowSet.getRows.get(3).getColVals.get(1).getStringVal.getValue === "44") assert(tRowSet.getRows.get(4).getColVals.get(1).getStringVal.getValue === "55") } - }