column based set support
This commit is contained in:
parent
8d05c989ec
commit
e4e682f1fc
@ -19,12 +19,10 @@ package yaooqinn.kyuubi.operation
|
||||
|
||||
import java.io.{File, FileNotFoundException}
|
||||
import java.security.PrivilegedExceptionAction
|
||||
import java.sql.{Date, Timestamp}
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.{Future, RejectedExecutionException}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import org.apache.hadoop.hive.conf.HiveConf
|
||||
@ -32,15 +30,15 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControl
|
||||
import org.apache.hadoop.hive.ql.session.OperationLog
|
||||
import org.apache.hive.service.cli._
|
||||
import org.apache.hive.service.cli.thrift.TProtocolVersion
|
||||
import org.apache.spark.KyuubiConf._
|
||||
import org.apache.spark.SparkUtils
|
||||
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils}
|
||||
import org.apache.spark.KyuubiConf._
|
||||
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
|
||||
import org.apache.spark.sql.catalyst.parser.ParseException
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
import yaooqinn.kyuubi.Logging
|
||||
import yaooqinn.kyuubi.cli.FetchOrientation
|
||||
import yaooqinn.kyuubi.schema.RowSet
|
||||
import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder}
|
||||
import yaooqinn.kyuubi.session.KyuubiSession
|
||||
import yaooqinn.kyuubi.ui.KyuubiServerMonitor
|
||||
|
||||
@ -231,7 +229,8 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
}
|
||||
// if not wrap doas ,it will cause NPE while proxy on
|
||||
session.ugi.doAs(new PrivilegedExceptionAction[RowSet] {
|
||||
override def run(): RowSet = RowSet(getResultSetSchema, taken.toSeq)
|
||||
override def run(): RowSet =
|
||||
RowSetBuilder.create(getResultSetSchema, taken.toSeq, session.getProtocolVersion)
|
||||
})
|
||||
|
||||
}
|
||||
@ -243,10 +242,6 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
|
||||
/**
|
||||
* Verify if the given fetch orientation is part of the default orientation types.
|
||||
*
|
||||
* @param orientation
|
||||
*
|
||||
* @throws HiveSQLException
|
||||
*/
|
||||
@throws[HiveSQLException]
|
||||
private[this] def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = {
|
||||
@ -255,11 +250,6 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
|
||||
/**
|
||||
* Verify if the given fetch orientation is part of the supported orientation types.
|
||||
*
|
||||
* @param orientation
|
||||
* @param supportedOrientations
|
||||
*
|
||||
* @throws HiveSQLException
|
||||
*/
|
||||
@throws[HiveSQLException]
|
||||
private[this] def validateFetchOrientation(
|
||||
|
||||
@ -32,7 +32,7 @@ import org.apache.spark.sql.types.StructType
|
||||
|
||||
import yaooqinn.kyuubi.Logging
|
||||
import yaooqinn.kyuubi.cli.FetchOrientation
|
||||
import yaooqinn.kyuubi.schema.RowSet
|
||||
import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder}
|
||||
import yaooqinn.kyuubi.service.AbstractService
|
||||
import yaooqinn.kyuubi.session.KyuubiSession
|
||||
|
||||
@ -157,9 +157,9 @@ private[kyuubi] class OperationManager private(name: String)
|
||||
throw new HiveSQLException("Couldn't find log associated with operation handle: " + opHandle)
|
||||
}
|
||||
try {
|
||||
// convert logs to RowSet
|
||||
// convert logs to RowBasedSet
|
||||
val logs = opLog.readOperationLog(isFetchFirst(orientation), maxRows).asScala.map(Row(_))
|
||||
RowSet(logSchema, logs)
|
||||
RowSetBuilder.create(logSchema, logs, opHandle.getProtocolVersion)
|
||||
} catch {
|
||||
case e: SQLException =>
|
||||
throw new HiveSQLException(e.getMessage, e.getCause)
|
||||
|
||||
22
src/main/scala/yaooqinn/kyuubi/schema/Column.scala
Normal file
22
src/main/scala/yaooqinn/kyuubi/schema/Column.scala
Normal file
@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package yaooqinn.kyuubi.schema
|
||||
|
||||
class Column {
|
||||
|
||||
}
|
||||
126
src/main/scala/yaooqinn/kyuubi/schema/ColumnBasedSet.scala
Normal file
126
src/main/scala/yaooqinn/kyuubi/schema/ColumnBasedSet.scala
Normal file
@ -0,0 +1,126 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package yaooqinn.kyuubi.schema
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.BitSet
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.hive.service.cli.thrift._
|
||||
import org.apache.spark.sql.{Row, SparkSQLUtils}
|
||||
import org.apache.spark.sql.types.{BinaryType, _}
|
||||
|
||||
case class ColumnBasedSet(types: StructType, rows: Seq[Row]) extends RowSet {
|
||||
import ColumnBasedSet._
|
||||
|
||||
override def toTRowSet: TRowSet = {
|
||||
val tRowSet = new TRowSet(0, Seq[TRow]().asJava)
|
||||
(0 until types.length).map(i => toTColumn(i, types(i).dataType)).foreach(tRowSet.addToColumns)
|
||||
tRowSet
|
||||
}
|
||||
|
||||
private[this] def toTColumn(ordinal: Int, typ: DataType): TColumn = {
|
||||
val nulls = new BitSet()
|
||||
typ match {
|
||||
case BooleanType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) true else row.getBoolean(ordinal)
|
||||
}.map(_.asInstanceOf[java.lang.Boolean]).asJava
|
||||
TColumn.boolVal(new TBoolColumn(values, bitSetToBuffer(nulls)))
|
||||
case ByteType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) 0 else row.getByte(ordinal)
|
||||
}.map(_.asInstanceOf[java.lang.Byte]).asJava
|
||||
TColumn.byteVal(new TByteColumn(values, bitSetToBuffer(nulls)))
|
||||
case ShortType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) 0 else row.getShort(ordinal)
|
||||
}.map(_.asInstanceOf[java.lang.Short]).asJava
|
||||
TColumn.i16Val(new TI16Column(values, bitSetToBuffer(nulls)))
|
||||
case IntegerType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) 0 else row.getInt(ordinal)
|
||||
}.map(_.asInstanceOf[java.lang.Integer]).asJava
|
||||
TColumn.i32Val(new TI32Column(values, bitSetToBuffer(nulls)))
|
||||
case LongType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) 0 else row.getLong(ordinal)
|
||||
}.map(_.asInstanceOf[java.lang.Long]).asJava
|
||||
TColumn.i64Val(new TI64Column(values, bitSetToBuffer(nulls)))
|
||||
case FloatType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) 0 else row.getFloat(ordinal)
|
||||
}.map(_.asInstanceOf[java.lang.Double]).asJava
|
||||
TColumn.doubleVal(new TDoubleColumn(values, bitSetToBuffer(nulls)))
|
||||
case DoubleType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) 0 else row.getDouble(ordinal)
|
||||
}.map(_.asInstanceOf[java.lang.Double]).asJava
|
||||
TColumn.doubleVal(new TDoubleColumn(values, bitSetToBuffer(nulls)))
|
||||
case StringType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) EMPTY_STRING else row.getString(ordinal)
|
||||
}.asJava
|
||||
TColumn.stringVal(new TStringColumn(values, bitSetToBuffer(nulls)))
|
||||
case BinaryType =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) {
|
||||
EMPTY_BINARY
|
||||
} else {
|
||||
ByteBuffer.wrap(row.getAs[Array[Byte]](ordinal))
|
||||
}
|
||||
}.asJava
|
||||
TColumn.binaryVal(new TBinaryColumn(values, bitSetToBuffer(nulls)))
|
||||
case _ =>
|
||||
val values = rows.zipWithIndex.map { case (row, i) =>
|
||||
nulls.set(i, row.isNullAt(ordinal))
|
||||
if (row.isNullAt(ordinal)) {
|
||||
EMPTY_STRING
|
||||
} else {
|
||||
SparkSQLUtils.toHiveString((row.get(ordinal), typ))
|
||||
}
|
||||
}.asJava
|
||||
TColumn.stringVal(new TStringColumn(values, bitSetToBuffer(nulls)))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object ColumnBasedSet {
|
||||
private val EMPTY_STRING = ""
|
||||
private val EMPTY_BINARY = ByteBuffer.allocate(0)
|
||||
private val MASKS = Array[Byte](0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80.toByte)
|
||||
|
||||
private def bitSetToBuffer(bitset: BitSet): ByteBuffer = {
|
||||
val nulls = new Array[Byte](1 + (bitset.length / 8))
|
||||
(0 until bitset.length).foreach { i =>
|
||||
nulls(i / 8) = (nulls(i / 8) | (if (bitset.get(i)) MASKS(i % 8) else 0.toByte)).toByte
|
||||
}
|
||||
ByteBuffer.wrap(nulls)
|
||||
}
|
||||
}
|
||||
112
src/main/scala/yaooqinn/kyuubi/schema/RowBasedSet.scala
Normal file
112
src/main/scala/yaooqinn/kyuubi/schema/RowBasedSet.scala
Normal file
@ -0,0 +1,112 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package yaooqinn.kyuubi.schema
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.hive.service.cli.thrift._
|
||||
import org.apache.spark.sql.{Row, SparkSQLUtils}
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
/**
|
||||
* A result set of Spark's [[Row]]s with its [[StructType]] as its schema, with the ability of
|
||||
* transform to [[TRowSet]].
|
||||
*/
|
||||
case class RowBasedSet(types: StructType, rows: Seq[Row]) extends RowSet {
|
||||
|
||||
override def toTRowSet: TRowSet = new TRowSet(0, toTRows(rows).asJava)
|
||||
|
||||
private[this] def toTRows(rows: Seq[Row]): Seq[TRow] = rows.map(toTRow)
|
||||
|
||||
private[this] def toTRow(row: Row): TRow = {
|
||||
val tRow = new TRow()
|
||||
(0 until row.length).map(i => toTColumnValue(i, row)).foreach(tRow.addToColVals)
|
||||
tRow
|
||||
}
|
||||
|
||||
private[this] def toTColumnValue(ordinal: Int, row: Row): TColumnValue =
|
||||
types(ordinal).dataType match {
|
||||
case BooleanType =>
|
||||
val boolValue = new TBoolValue
|
||||
if (!row.isNullAt(ordinal)) boolValue.setValue(row.getBoolean(ordinal))
|
||||
TColumnValue.boolVal(boolValue)
|
||||
|
||||
case ByteType =>
|
||||
val byteValue = new TByteValue
|
||||
if (!row.isNullAt(ordinal)) byteValue.setValue(row.getByte(ordinal))
|
||||
TColumnValue.byteVal(byteValue)
|
||||
|
||||
case ShortType =>
|
||||
val tI16Value = new TI16Value
|
||||
if (!row.isNullAt(ordinal)) tI16Value.setValue(row.getShort(ordinal))
|
||||
TColumnValue.i16Val(tI16Value)
|
||||
|
||||
case IntegerType =>
|
||||
val tI32Value = new TI32Value
|
||||
if (!row.isNullAt(ordinal)) tI32Value.setValue(row.getInt(ordinal))
|
||||
TColumnValue.i32Val(tI32Value)
|
||||
|
||||
case LongType =>
|
||||
val tI64Value = new TI64Value
|
||||
if (!row.isNullAt(ordinal)) tI64Value.setValue(row.getLong(ordinal))
|
||||
TColumnValue.i64Val(tI64Value)
|
||||
|
||||
case FloatType =>
|
||||
val tDoubleValue = new TDoubleValue
|
||||
if (!row.isNullAt(ordinal)) tDoubleValue.setValue(row.getFloat(ordinal))
|
||||
TColumnValue.doubleVal(tDoubleValue)
|
||||
|
||||
case DoubleType =>
|
||||
val tDoubleValue = new TDoubleValue
|
||||
if (!row.isNullAt(ordinal)) tDoubleValue.setValue(row.getDouble(ordinal))
|
||||
TColumnValue.doubleVal(tDoubleValue)
|
||||
|
||||
case StringType =>
|
||||
val tStringValue = new TStringValue
|
||||
if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getString(ordinal))
|
||||
TColumnValue.stringVal(tStringValue)
|
||||
|
||||
case DecimalType() =>
|
||||
val tStrValue = new TStringValue
|
||||
if (!row.isNullAt(ordinal)) tStrValue.setValue(row.getDecimal(ordinal).toString)
|
||||
TColumnValue.stringVal(tStrValue)
|
||||
|
||||
case DateType =>
|
||||
val tStringValue = new TStringValue
|
||||
if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getDate(ordinal).toString)
|
||||
TColumnValue.stringVal(tStringValue)
|
||||
|
||||
case TimestampType =>
|
||||
val tStringValue = new TStringValue
|
||||
if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getTimestamp(ordinal).toString)
|
||||
TColumnValue.stringVal(tStringValue)
|
||||
|
||||
case BinaryType =>
|
||||
val tStringValue = new TStringValue
|
||||
if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getAs[Array[Byte]](ordinal).toString)
|
||||
TColumnValue.stringVal(tStringValue)
|
||||
|
||||
case _: ArrayType | _: StructType | _: MapType =>
|
||||
val tStrValue = new TStringValue
|
||||
if (!row.isNullAt(ordinal)) {
|
||||
tStrValue.setValue(
|
||||
SparkSQLUtils.toHiveString((row.get(ordinal), types(ordinal).dataType)))
|
||||
}
|
||||
TColumnValue.stringVal(tStrValue)
|
||||
}
|
||||
}
|
||||
@ -17,97 +17,8 @@
|
||||
|
||||
package yaooqinn.kyuubi.schema
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import org.apache.hive.service.cli.thrift.TRowSet
|
||||
|
||||
import org.apache.hive.service.cli.thrift._
|
||||
import org.apache.spark.sql.{Row, SparkSQLUtils}
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
/**
|
||||
* A result set of Spark's [[Row]]s with its [[StructType]] as its schema, with the ability of
|
||||
* transform to [[TRowSet]].
|
||||
*/
|
||||
case class RowSet(types: StructType, rows: Seq[Row]) {
|
||||
|
||||
def toTRowSet: TRowSet = new TRowSet(0, toTRows(rows).asJava)
|
||||
|
||||
private[this] def toTRows(rows: Seq[Row]): Seq[TRow] = rows.map(toTRow)
|
||||
|
||||
|
||||
private[this] def toTRow(row: Row): TRow = {
|
||||
val tRow = new TRow()
|
||||
(0 until row.length).map(i => toTColumnValue(i, row)).foreach(tRow.addToColVals)
|
||||
tRow
|
||||
}
|
||||
|
||||
private[this] def toTColumnValue(ordinal: Int, row: Row): TColumnValue =
|
||||
types(ordinal).dataType match {
|
||||
case BooleanType =>
|
||||
val boolValue = new TBoolValue
|
||||
if (!row.isNullAt(ordinal)) boolValue.setValue(row.getBoolean(ordinal))
|
||||
TColumnValue.boolVal(boolValue)
|
||||
|
||||
case ByteType =>
|
||||
val byteValue = new TByteValue
|
||||
if (!row.isNullAt(ordinal)) byteValue.setValue(row.getByte(ordinal))
|
||||
TColumnValue.byteVal(byteValue)
|
||||
|
||||
case ShortType =>
|
||||
val tI16Value = new TI16Value
|
||||
if (!row.isNullAt(ordinal)) tI16Value.setValue(row.getShort(ordinal))
|
||||
TColumnValue.i16Val(tI16Value)
|
||||
|
||||
case IntegerType =>
|
||||
val tI32Value = new TI32Value
|
||||
if (!row.isNullAt(ordinal)) tI32Value.setValue(row.getInt(ordinal))
|
||||
TColumnValue.i32Val(tI32Value)
|
||||
|
||||
case LongType =>
|
||||
val tI64Value = new TI64Value
|
||||
if (!row.isNullAt(ordinal)) tI64Value.setValue(row.getLong(ordinal))
|
||||
TColumnValue.i64Val(tI64Value)
|
||||
|
||||
case FloatType =>
|
||||
val tDoubleValue = new TDoubleValue
|
||||
if (!row.isNullAt(ordinal)) tDoubleValue.setValue(row.getFloat(ordinal))
|
||||
TColumnValue.doubleVal(tDoubleValue)
|
||||
|
||||
case DoubleType =>
|
||||
val tDoubleValue = new TDoubleValue
|
||||
if (!row.isNullAt(ordinal)) tDoubleValue.setValue(row.getDouble(ordinal))
|
||||
TColumnValue.doubleVal(tDoubleValue)
|
||||
|
||||
case StringType =>
|
||||
val tStringValue = new TStringValue
|
||||
if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getString(ordinal))
|
||||
TColumnValue.stringVal(tStringValue)
|
||||
|
||||
case DecimalType() =>
|
||||
val tStrValue = new TStringValue
|
||||
if (!row.isNullAt(ordinal)) tStrValue.setValue(row.getDecimal(ordinal).toString)
|
||||
TColumnValue.stringVal(tStrValue)
|
||||
|
||||
case DateType =>
|
||||
val tStringValue = new TStringValue
|
||||
if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getDate(ordinal).toString)
|
||||
TColumnValue.stringVal(tStringValue)
|
||||
|
||||
case TimestampType =>
|
||||
val tStringValue = new TStringValue
|
||||
if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getTimestamp(ordinal).toString)
|
||||
TColumnValue.stringVal(tStringValue)
|
||||
|
||||
case BinaryType =>
|
||||
val tStringValue = new TStringValue
|
||||
if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getAs[Array[Byte]](ordinal).toString)
|
||||
TColumnValue.stringVal(tStringValue)
|
||||
|
||||
case _: ArrayType | _: StructType | _: MapType =>
|
||||
val tStrValue = new TStringValue
|
||||
if (!row.isNullAt(ordinal)) {
|
||||
tStrValue.setValue(
|
||||
SparkSQLUtils.toHiveString((row.get(ordinal), types(ordinal).dataType)))
|
||||
}
|
||||
TColumnValue.stringVal(tStrValue)
|
||||
}
|
||||
trait RowSet {
|
||||
def toTRowSet: TRowSet
|
||||
}
|
||||
|
||||
34
src/main/scala/yaooqinn/kyuubi/schema/RowSetBuilder.scala
Normal file
34
src/main/scala/yaooqinn/kyuubi/schema/RowSetBuilder.scala
Normal file
@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package yaooqinn.kyuubi.schema
|
||||
|
||||
import org.apache.hive.service.cli.thrift.TProtocolVersion
|
||||
import org.apache.hive.service.cli.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
object RowSetBuilder {
|
||||
|
||||
def create(types: StructType, rows: Seq[Row], version: TProtocolVersion): RowSet = {
|
||||
if (version.getValue >= HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
|
||||
ColumnBasedSet(types, rows)
|
||||
} else {
|
||||
RowBasedSet(types, rows)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -47,7 +47,7 @@ import yaooqinn.kyuubi.Logging
|
||||
import yaooqinn.kyuubi.auth.KyuubiAuthFactory
|
||||
import yaooqinn.kyuubi.cli._
|
||||
import yaooqinn.kyuubi.operation.{KyuubiOperation, OperationHandle, OperationManager}
|
||||
import yaooqinn.kyuubi.schema.RowSet
|
||||
import yaooqinn.kyuubi.schema.{RowBasedSet, RowSet}
|
||||
import yaooqinn.kyuubi.ui.{KyuubiServerListener, KyuubiServerMonitor}
|
||||
import yaooqinn.kyuubi.utils.{HadoopUtils, ReflectUtils}
|
||||
|
||||
|
||||
@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
class RowSetSuite extends SparkFunSuite {
|
||||
class RowBasedSetSuite extends SparkFunSuite {
|
||||
|
||||
test("row set basic suites") {
|
||||
val maxRows: Int = 5
|
||||
@ -49,7 +49,7 @@ class RowSetSuite extends SparkFunSuite {
|
||||
// fetch next
|
||||
val rowIterator = rows.iterator
|
||||
var taken = rowIterator.take(maxRows).toSeq
|
||||
var tRowSet = RowSet(schema, taken).toTRowSet
|
||||
var tRowSet = RowBasedSet(schema, taken).toTRowSet
|
||||
assert(tRowSet.getRowsSize === 5)
|
||||
assert(tRowSet.getRows.get(0).getColVals.get(1).getStringVal.getValue === "11")
|
||||
assert(tRowSet.getRows.get(1).getColVals.get(1).getStringVal.getValue === "22")
|
||||
@ -58,7 +58,7 @@ class RowSetSuite extends SparkFunSuite {
|
||||
assert(tRowSet.getRows.get(4).getColVals.get(1).getStringVal.getValue === "55")
|
||||
|
||||
taken = rowIterator.take(maxRows).toSeq
|
||||
tRowSet = RowSet(schema, taken).toTRowSet
|
||||
tRowSet = RowBasedSet(schema, taken).toTRowSet
|
||||
assert(tRowSet.getRowsSize === 5)
|
||||
assert(tRowSet.getRows.get(0).getColVals.get(1).getStringVal.getValue === "66")
|
||||
assert(tRowSet.getRows.get(1).getColVals.get(1).getStringVal.getValue === "77")
|
||||
@ -67,7 +67,7 @@ class RowSetSuite extends SparkFunSuite {
|
||||
assert(tRowSet.getRows.get(4).getColVals.get(1).getStringVal.getValue === "000")
|
||||
|
||||
taken = rowIterator.take(maxRows).toSeq
|
||||
tRowSet = RowSet(schema, taken).toTRowSet
|
||||
tRowSet = RowBasedSet(schema, taken).toTRowSet
|
||||
assert(tRowSet.getRowsSize === 5)
|
||||
assert(tRowSet.getRows.get(0).getColVals.get(1).getStringVal.getValue === "111")
|
||||
assert(tRowSet.getRows.get(1).getColVals.get(1).getStringVal.getValue === "222")
|
||||
@ -76,7 +76,7 @@ class RowSetSuite extends SparkFunSuite {
|
||||
assert(tRowSet.getRows.get(4).getColVals.get(1).getStringVal.getValue === "555")
|
||||
|
||||
taken = rowIterator.take(maxRows).toSeq
|
||||
tRowSet = RowSet(schema, taken).toTRowSet
|
||||
tRowSet = RowBasedSet(schema, taken).toTRowSet
|
||||
assert(tRowSet.getRowsSize === 1)
|
||||
assert(tRowSet.getRows.get(0).getColVals.get(1).getStringVal.getValue === "666")
|
||||
intercept[IndexOutOfBoundsException](tRowSet.getRows.get(1))
|
||||
@ -90,7 +90,7 @@ class RowSetSuite extends SparkFunSuite {
|
||||
val resultList = itr2.toList
|
||||
|
||||
taken = itr1.toSeq
|
||||
tRowSet = RowSet(schema, taken).toTRowSet
|
||||
tRowSet = RowBasedSet(schema, taken).toTRowSet
|
||||
assert(tRowSet.getRowsSize === 5)
|
||||
assert(tRowSet.getRows.get(0).getColVals.get(1).getStringVal.getValue === "11")
|
||||
assert(tRowSet.getRows.get(1).getColVals.get(1).getStringVal.getValue === "22")
|
||||
@ -99,7 +99,7 @@ class RowSetSuite extends SparkFunSuite {
|
||||
assert(tRowSet.getRows.get(4).getColVals.get(1).getStringVal.getValue === "55")
|
||||
|
||||
taken = resultList
|
||||
tRowSet = RowSet(schema, taken).toTRowSet
|
||||
tRowSet = RowBasedSet(schema, taken).toTRowSet
|
||||
assert(tRowSet.getRowsSize === 5)
|
||||
assert(tRowSet.getRows.get(0).getColVals.get(1).getStringVal.getValue === "11")
|
||||
assert(tRowSet.getRows.get(1).getColVals.get(1).getStringVal.getValue === "22")
|
||||
@ -108,7 +108,7 @@ class RowSetSuite extends SparkFunSuite {
|
||||
assert(tRowSet.getRows.get(4).getColVals.get(1).getStringVal.getValue === "55")
|
||||
|
||||
taken = resultList
|
||||
tRowSet = RowSet(schema, taken).toTRowSet
|
||||
tRowSet = RowBasedSet(schema, taken).toTRowSet
|
||||
assert(tRowSet.getRowsSize === 5)
|
||||
assert(tRowSet.getRows.get(0).getColVals.get(1).getStringVal.getValue === "11")
|
||||
assert(tRowSet.getRows.get(1).getColVals.get(1).getStringVal.getValue === "22")
|
||||
@ -116,5 +116,4 @@ class RowSetSuite extends SparkFunSuite {
|
||||
assert(tRowSet.getRows.get(3).getColVals.get(1).getStringVal.getValue === "44")
|
||||
assert(tRowSet.getRows.get(4).getColVals.get(1).getStringVal.getValue === "55")
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user