[KYUUBI #1704] Complete Flink-Hive data type mapping

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

Currently, only a subset of Flink data types is supported. Queries that return results with unsupported data types would fail with Thrift TTransportExecption. We should fix this by completing Flink data type to Hive data type mapping.

This is a sub-task of KPIP-2 #1322.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1770 from link3280/feature/KYUUBI-1704.

Closes #1704

2d53a3b1 [Paul Lin] [KYUUBI #1704] Complete Flink-Hive data type mapping

Authored-by: Paul Lin <paullin3280@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
Paul Lin 2022-01-17 10:10:50 +08:00 committed by Kent Yao
parent cab0762c6c
commit 4f2696d300
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
2 changed files with 239 additions and 15 deletions

View File

@ -19,17 +19,22 @@ package org.apache.kyuubi.engine.flink.schema
import java.{lang, util}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time.{LocalDate, LocalDateTime}
import java.util.Collections
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
import org.apache.flink.table.catalog.Column
import org.apache.flink.table.types.logical.{DecimalType, _}
import org.apache.flink.table.types.logical._
import org.apache.flink.types.Row
import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.util.RowSetUtils.{dateFormatter, timestampFormatter}
object RowSet {
@ -70,7 +75,8 @@ object RowSet {
row: Row,
resultSet: ResultSet): TColumnValue = {
val logicalType = resultSet.getColumns.get(ordinal).getDataType.getLogicalType
val column = resultSet.getColumns.get(ordinal)
val logicalType = column.getDataType.getLogicalType
logicalType match {
case _: BooleanType =>
@ -80,6 +86,12 @@ object RowSet {
}
TColumnValue.boolVal(boolValue)
case _: TinyIntType =>
val tByteValue = new TByteValue
if (row.getField(ordinal) != null) {
tByteValue.setValue(row.getField(ordinal).asInstanceOf[Byte])
}
TColumnValue.byteVal(tByteValue)
case _: SmallIntType =>
val tI16Value = new TI16Value
if (row.getField(ordinal) != null) {
tI16Value.setValue(row.getField(ordinal).asInstanceOf[Short])
@ -88,7 +100,7 @@ object RowSet {
case _: IntType =>
val tI32Value = new TI32Value
if (row.getField(ordinal) != null) {
tI32Value.setValue(row.getField(ordinal).asInstanceOf[Short])
tI32Value.setValue(row.getField(ordinal).asInstanceOf[Int])
}
TColumnValue.i32Val(tI32Value)
case _: BigIntType =>
@ -112,21 +124,23 @@ object RowSet {
case _: VarCharType =>
val tStringValue = new TStringValue
if (row.getField(ordinal) != null) {
tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
val stringValue = row.getField(ordinal).asInstanceOf[String]
tStringValue.setValue(stringValue)
}
TColumnValue.stringVal(tStringValue)
case _: CharType =>
val tStringValue = new TStringValue
if (row.getField(ordinal) != null) {
tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
val stringValue = row.getField(ordinal).asInstanceOf[String]
tStringValue.setValue(stringValue)
}
TColumnValue.stringVal(tStringValue)
case _ =>
val tStrValue = new TStringValue
case t =>
val tStringValue = new TStringValue
if (row.getField(ordinal) != null) {
// TODO to be done
tStringValue.setValue(toHiveString((row.getField(ordinal), t)))
}
TColumnValue.stringVal(tStrValue)
TColumnValue.stringVal(tStringValue)
}
}
@ -141,15 +155,29 @@ object RowSet {
val values = getOrSetAsNull[lang.Boolean](rows, ordinal, nulls, true)
TColumn.boolVal(new TBoolColumn(values, nulls))
case _: TinyIntType =>
val values = getOrSetAsNull[lang.Byte](rows, ordinal, nulls, 0.toByte)
TColumn.byteVal(new TByteColumn(values, nulls))
case _: SmallIntType =>
val values = getOrSetAsNull[lang.Short](rows, ordinal, nulls, 0.toShort)
TColumn.i16Val(new TI16Column(values, nulls))
case _: IntType =>
val values = getOrSetAsNull[lang.Integer](rows, ordinal, nulls, 0)
TColumn.i32Val(new TI32Column(values, nulls))
case _: BigIntType =>
val values = getOrSetAsNull[lang.Long](rows, ordinal, nulls, 0L)
TColumn.i64Val(new TI64Column(values, nulls))
case _: FloatType =>
val values = getOrSetAsNull[lang.Double](rows, ordinal, nulls, 0.0)
TColumn.doubleVal(new TDoubleColumn(values, nulls))
case _: DoubleType =>
val values = getOrSetAsNull[lang.Double](rows, ordinal, nulls, 0.0)
TColumn.doubleVal(new TDoubleColumn(values, nulls))
case _: VarCharType =>
val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
TColumn.stringVal(new TStringColumn(values, nulls))
case _: CharType =>
val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
TColumn.stringVal(new TStringColumn(values, nulls))
case _ =>
val values = rows.zipWithIndex.toList.map { case (row, i) =>
nulls.set(i, row.getField(ordinal) == null)
@ -209,6 +237,12 @@ object RowSet {
Map(
TCLIServiceConstants.PRECISION -> TTypeQualifierValue.i32Value(d.getPrecision),
TCLIServiceConstants.SCALE -> TTypeQualifierValue.i32Value(d.getScale)).asJava
case v: VarCharType =>
Map(TCLIServiceConstants.CHARACTER_MAXIMUM_LENGTH ->
TTypeQualifierValue.i32Value(v.getLength)).asJava
case ch: CharType =>
Map(TCLIServiceConstants.CHARACTER_MAXIMUM_LENGTH ->
TTypeQualifierValue.i32Value(ch.getLength)).asJava
case _ => Collections.emptyMap[String, TTypeQualifierValue]()
}
ret.setQualifiers(qualifiers)
@ -220,15 +254,31 @@ object RowSet {
case _: BooleanType => TTypeId.BOOLEAN_TYPE
case _: FloatType => TTypeId.FLOAT_TYPE
case _: DoubleType => TTypeId.DOUBLE_TYPE
case _: VarCharType => TTypeId.STRING_TYPE
case _: CharType => TTypeId.STRING_TYPE
case _: VarCharType => TTypeId.VARCHAR_TYPE
case _: CharType => TTypeId.CHAR_TYPE
case _: TinyIntType => TTypeId.TINYINT_TYPE
case _: SmallIntType => TTypeId.SMALLINT_TYPE
case _: IntType => TTypeId.INT_TYPE
case _: BigIntType => TTypeId.BIGINT_TYPE
case _: DecimalType => TTypeId.DECIMAL_TYPE
case _: DateType => TTypeId.DATE_TYPE
case _: TimestampType => TTypeId.TIMESTAMP_TYPE
case _: ArrayType => TTypeId.ARRAY_TYPE
case _: MapType => TTypeId.MAP_TYPE
case _: RowType => TTypeId.STRUCT_TYPE
case _: BinaryType => TTypeId.BINARY_TYPE
case t @ (_: ZonedTimestampType | _: LocalZonedTimestampType | _: MultisetType |
_: YearMonthIntervalType | _: DayTimeIntervalType) =>
throw new IllegalArgumentException(
"Flink data type `%s` is not supported currently".format(t.asSummaryString()),
null)
case other =>
throw new IllegalArgumentException(s"Unrecognized type name: ${other.asSummaryString()}")
}
/**
* A simpler impl of Flink's toHiveString
* TODO: support Flink's new data type system
*/
def toHiveString(dataWithType: (Any, LogicalType)): String = {
dataWithType match {
@ -236,9 +286,51 @@ object RowSet {
// Only match nulls in nested type values
"null"
case (d: Int, _: DateType) =>
dateFormatter.format(LocalDate.ofEpochDay(d))
case (ld: LocalDate, _: DateType) =>
dateFormatter.format(ld)
case (d: Date, _: DateType) =>
dateFormatter.format(d.toInstant)
case (ldt: LocalDateTime, _: TimestampType) =>
timestampFormatter.format(ldt)
case (ts: Timestamp, _: TimestampType) =>
timestampFormatter.format(ts.toInstant)
case (decimal: java.math.BigDecimal, _: DecimalType) =>
decimal.toPlainString
case (a: Array[_], t: ArrayType) =>
a.map(v => toHiveString((v, t.getElementType))).toSeq.mkString(
"[",
",",
"]")
case (m: Map[_, _], t: MapType) =>
m.map {
case (k, v) =>
toHiveString((k, t.getKeyType)) + ":" + toHiveString((v, t.getValueType))
}
.toSeq.mkString("{", ",", "}")
case (r: Row, t: RowType) =>
val lb = ListBuffer[String]()
for (i <- 0 until r.getArity) {
lb += s"""${t.getTypeAt(i).toString}:${toHiveString((r.getField(i), t.getTypeAt(i)))}"""
}
lb.toList.mkString("{", ",", "}")
case (s: String, _ @(_: VarCharType | _: CharType)) =>
// Only match string in nested type values
"\"" + s + "\""
case (bin: Array[Byte], _: BinaryType) =>
new String(bin, StandardCharsets.UTF_8)
case (other, _) =>
other.toString
}

View File

@ -198,6 +198,134 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}
}
test("execute statement - select varchar/char") {
withJdbcStatement() { statement =>
val resultSet =
statement.executeQuery("select cast('varchar10' as varchar(10)), " +
"cast('char16' as char(16))")
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.VARCHAR)
assert(metaData.getPrecision(1) === 10)
assert(metaData.getColumnType(2) === java.sql.Types.CHAR)
assert(metaData.getPrecision(2) === 16)
assert(resultSet.next())
assert(resultSet.getString(1) === "varchar10")
assert(resultSet.getString(2) === "char16 ")
}
}
test("execute statement - select tinyint") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select cast(1 as tinyint)")
assert(resultSet.next())
assert(resultSet.getByte(1) === 1)
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.TINYINT)
}
}
test("execute statement - select smallint") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select cast(1 as smallint)")
assert(resultSet.next())
assert(resultSet.getShort(1) === 1)
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.SMALLINT)
}
}
test("execute statement - select int") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select 1")
assert(resultSet.next())
assert(resultSet.getInt(1) === 1)
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.INTEGER)
}
}
test("execute statement - select bigint") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select cast(1 as bigint)")
assert(resultSet.next())
assert(resultSet.getLong(1) === 1)
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.BIGINT)
}
}
test("execute statement - select date") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select date '2022-01-01'")
assert(resultSet.next())
assert(resultSet.getDate(1).toLocalDate.toString == "2022-01-01")
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.DATE)
}
}
test("execute statement - select timestamp") {
withJdbcStatement() { statement =>
val resultSet =
statement.executeQuery(
"select timestamp '2022-01-01 00:00:00', timestamp '2022-01-01 00:00:00.123456789'")
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.TIMESTAMP)
assert(metaData.getColumnType(2) === java.sql.Types.TIMESTAMP)
// 9 digits for fraction of seconds
assert(metaData.getPrecision(1) == 29)
assert(metaData.getPrecision(2) == 29)
assert(resultSet.next())
assert(resultSet.getTimestamp(1).toString == "2022-01-01 00:00:00.0")
assert(resultSet.getTimestamp(2).toString == "2022-01-01 00:00:00.123456789")
}
}
test("execute statement - select array") {
withJdbcStatement() { statement =>
val resultSet =
statement.executeQuery("select array ['v1', 'v2', 'v3']")
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.ARRAY)
assert(resultSet.next())
assert(resultSet.getObject(1).toString == "[\"v1\",\"v2\",\"v3\"]")
}
}
test("execute statement - select map") {
withJdbcStatement() { statement =>
val resultSet =
statement.executeQuery("select map ['k1', 'v1', 'k2', 'v2']")
assert(resultSet.next())
assert(resultSet.getString(1) == "{k1=v1, k2=v2}")
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.JAVA_OBJECT)
}
}
test("execute statement - select row") {
withJdbcStatement() { statement =>
val resultSet =
statement.executeQuery("select (1, '2', true)")
assert(resultSet.next())
assert(
resultSet.getString(1) == "{INT NOT NULL:1,CHAR(1) NOT NULL:\"2\",BOOLEAN NOT NULL:true}")
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.STRUCT)
}
}
test("execute statement - select binary") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select encode('kyuubi', 'UTF-8')")
assert(resultSet.next())
assert(
resultSet.getString(1) == "kyuubi")
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.BINARY)
}
}
test("execute statement - show functions") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("show functions")
@ -270,11 +398,15 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
})
}
ignore("execute statement - insert into") {
// TODO: ignore temporally due to KYUUBI #1704
test("execute statement - insert into") {
withMultipleConnectionJdbcStatement()({ statement =>
statement.executeQuery("create table tbl_a (a int) with ('connector' = 'blackhole')")
statement.executeUpdate("insert into tbl_a select 1")
val resultSet = statement.executeQuery("insert into tbl_a select 1")
val metadata = resultSet.getMetaData
assert(metadata.getColumnName(1) == "default_catalog.default_database.tbl_a")
assert(metadata.getColumnType(1) == java.sql.Types.BIGINT)
assert(resultSet.next())
assert(resultSet.getLong(1) == -1L)
})
}
}