[KYUUBI #1582] Use ClientTypeSignature to further analysis of trino column type
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Use ClientTypeSignature to further analysis of trino column type ### _How was this patch tested?_ - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [X] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1598 from hddong/use-ClientTypeSignature. Closes #1582 36ef139e [hongdongdong] Add check 22060208 [hongdongdong] remove guard 337f11ee [hongdongdong] [KYUUBI #1582] Use ClientTypeSignature to further analysis of trino column type Authored-by: hongdongdong <hongdongdong@cmss.chinamobile.com> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
3ffb12de24
commit
fcc6471fec
@ -0,0 +1,31 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.engine.trino.util;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
public class PreconditionsWrapper {
|
||||
/**
|
||||
* To avoid ambiguous reference to overloaded definition in scala. {@link
|
||||
* Preconditions#checkArgument(boolean, Object)} {@link Preconditions#checkArgument(boolean,
|
||||
* String, Object...)}
|
||||
*/
|
||||
public static void checkArgument(boolean expression, Object errorMessage) {
|
||||
Preconditions.checkArgument(expression, errorMessage);
|
||||
}
|
||||
}
|
||||
@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import io.trino.client.ClientStandardTypes._
|
||||
import io.trino.client.ClientTypeSignature
|
||||
import io.trino.client.Column
|
||||
import io.trino.client.Row
|
||||
import org.apache.hive.service.rpc.thrift.TBinaryColumn
|
||||
@ -46,6 +47,7 @@ import org.apache.hive.service.rpc.thrift.TRowSet
|
||||
import org.apache.hive.service.rpc.thrift.TStringColumn
|
||||
import org.apache.hive.service.rpc.thrift.TStringValue
|
||||
|
||||
import org.apache.kyuubi.engine.trino.util.PreconditionsWrapper._
|
||||
import org.apache.kyuubi.util.RowSetUtils.bitSetToBuffer
|
||||
|
||||
object RowSet {
|
||||
@ -78,7 +80,7 @@ object RowSet {
|
||||
val tColumn = toTColumn(
|
||||
rows,
|
||||
i,
|
||||
filed.getType)
|
||||
filed.getTypeSignature)
|
||||
tRowSet.addToColumns(tColumn)
|
||||
}
|
||||
tRowSet
|
||||
@ -87,9 +89,9 @@ object RowSet {
|
||||
private def toTColumn(
|
||||
rows: Seq[Seq[Any]],
|
||||
ordinal: Int,
|
||||
typ: String): TColumn = {
|
||||
typ: ClientTypeSignature): TColumn = {
|
||||
val nulls = new java.util.BitSet()
|
||||
typ match {
|
||||
typ.getRawType match {
|
||||
case BOOLEAN =>
|
||||
val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true)
|
||||
TColumn.boolVal(new TBoolColumn(values, nulls))
|
||||
@ -136,7 +138,7 @@ object RowSet {
|
||||
if (row(ordinal) == null) {
|
||||
""
|
||||
} else {
|
||||
toHiveString((row(ordinal), typ))
|
||||
toHiveString(row(ordinal), typ)
|
||||
}
|
||||
}.asJava
|
||||
TColumn.stringVal(new TStringColumn(values, nulls))
|
||||
@ -170,7 +172,7 @@ object RowSet {
|
||||
row: List[Any],
|
||||
types: List[Column]): TColumnValue = {
|
||||
|
||||
types(ordinal).getType match {
|
||||
types(ordinal).getTypeSignature.getRawType match {
|
||||
case BOOLEAN =>
|
||||
val boolValue = new TBoolValue
|
||||
if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean])
|
||||
@ -215,7 +217,7 @@ object RowSet {
|
||||
val tStrValue = new TStringValue
|
||||
if (row(ordinal) != null) {
|
||||
tStrValue.setValue(
|
||||
toHiveString((row(ordinal), types(ordinal).getType)))
|
||||
toHiveString(row(ordinal), types(ordinal).getTypeSignature))
|
||||
}
|
||||
TColumnValue.stringVal(tStrValue)
|
||||
}
|
||||
@ -224,8 +226,8 @@ object RowSet {
|
||||
/**
|
||||
* A simpler impl of Trino's toHiveString
|
||||
*/
|
||||
def toHiveString(dataWithType: (Any, String)): String = {
|
||||
dataWithType match {
|
||||
def toHiveString(data: Any, typ: ClientTypeSignature): String = {
|
||||
(data, typ.getRawType) match {
|
||||
case (null, _) =>
|
||||
// Only match nulls in nested type values
|
||||
"null"
|
||||
@ -237,46 +239,41 @@ object RowSet {
|
||||
// Only match string in nested type values
|
||||
"\"" + s + "\""
|
||||
|
||||
// for Array Map and Row, temporarily convert to string
|
||||
// TODO further analysis of type
|
||||
case (list: java.util.List[_], _) =>
|
||||
formatValue(list)
|
||||
case (list: java.util.List[_], ARRAY) =>
|
||||
checkArgument(
|
||||
typ.getArgumentsAsTypeSignatures.asScala.nonEmpty,
|
||||
"Missing ARRAY argument type")
|
||||
val listType = typ.getArgumentsAsTypeSignatures.get(0)
|
||||
list.asScala
|
||||
.map(toHiveString(_, listType))
|
||||
.mkString("[", ",", "]")
|
||||
|
||||
case (m: java.util.Map[_, _], _) =>
|
||||
formatValue(m)
|
||||
case (m: java.util.Map[_, _], MAP) =>
|
||||
checkArgument(
|
||||
typ.getArgumentsAsTypeSignatures.size() == 2,
|
||||
"Mismatched number of MAP argument types")
|
||||
val keyType = typ.getArgumentsAsTypeSignatures.get(0)
|
||||
val valueType = typ.getArgumentsAsTypeSignatures.get(1)
|
||||
m.asScala.map { case (key, value) =>
|
||||
toHiveString(key, keyType) + ":" + toHiveString(value, valueType)
|
||||
}.toSeq.sorted.mkString("{", ",", "}")
|
||||
|
||||
case (row: Row, _) =>
|
||||
formatValue(row)
|
||||
case (row: Row, ROW) =>
|
||||
checkArgument(
|
||||
row.getFields.size() == typ.getArguments.size(),
|
||||
"Mismatched data values and ROW type")
|
||||
row.getFields.asScala.zipWithIndex.map { case (r, index) =>
|
||||
val namedRowType = typ.getArguments.get(index).getNamedTypeSignature
|
||||
if (namedRowType.getName.isPresent) {
|
||||
namedRowType.getName.get() + "=" +
|
||||
toHiveString(r.getValue, namedRowType.getTypeSignature)
|
||||
} else {
|
||||
toHiveString(r.getValue, namedRowType.getTypeSignature)
|
||||
}
|
||||
}.mkString("{", ",", "}")
|
||||
|
||||
case (other, _) =>
|
||||
other.toString
|
||||
}
|
||||
}
|
||||
|
||||
def formatValue(o: Any): String = {
|
||||
o match {
|
||||
case null =>
|
||||
"null"
|
||||
|
||||
case m: java.util.Map[_, _] =>
|
||||
m.asScala.map { case (key, value) =>
|
||||
formatValue(key) + ":" + formatValue(value)
|
||||
}.toSeq.sorted.mkString("{", ",", "}")
|
||||
|
||||
case l: java.util.List[_] =>
|
||||
l.asScala.map(formatValue).mkString("[", ",", "]")
|
||||
|
||||
case row: Row =>
|
||||
row.getFields.asScala.map { r =>
|
||||
val formattedValue = formatValue(r.getValue())
|
||||
if (r.getName.isPresent) {
|
||||
r.getName.get() + "=" + formattedValue
|
||||
} else {
|
||||
formattedValue
|
||||
}
|
||||
}.mkString("{", ",", "}")
|
||||
|
||||
case _ => o.toString
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,13 +21,17 @@ import java.nio.ByteBuffer
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.sql.Date
|
||||
import java.sql.Time
|
||||
import java.util.Optional
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import io.trino.client.ClientStandardTypes._
|
||||
import io.trino.client.ClientTypeSignature
|
||||
import io.trino.client.ClientTypeSignatureParameter
|
||||
import io.trino.client.Column
|
||||
import io.trino.client.NamedClientTypeSignature
|
||||
import io.trino.client.Row
|
||||
import io.trino.client.RowFieldName
|
||||
import org.apache.hive.service.rpc.thrift.TProtocolVersion
|
||||
|
||||
import org.apache.kyuubi.KyuubiFunSuite
|
||||
@ -38,6 +42,28 @@ class RowSetSuite extends KyuubiFunSuite {
|
||||
|
||||
final private val UUID_PREFIX = "486bb66f-1206-49e3-993f-0db68f3cd8"
|
||||
|
||||
lazy val arrayTypeSignature: ClientTypeSignature = new ClientTypeSignature(
|
||||
ARRAY,
|
||||
List(ClientTypeSignatureParameter.ofType(new ClientTypeSignature(DOUBLE))).asJava)
|
||||
|
||||
lazy val mapTypeSignature: ClientTypeSignature = new ClientTypeSignature(
|
||||
MAP,
|
||||
List(
|
||||
ClientTypeSignatureParameter.ofType(new ClientTypeSignature(INTEGER)),
|
||||
ClientTypeSignatureParameter.ofType(new ClientTypeSignature(DOUBLE))).asJava)
|
||||
|
||||
lazy val rowTypeSignature: ClientTypeSignature = new ClientTypeSignature(
|
||||
ROW,
|
||||
List(
|
||||
ClientTypeSignatureParameter.ofNamedType(
|
||||
new NamedClientTypeSignature(
|
||||
Optional.of(new RowFieldName("foo")),
|
||||
new ClientTypeSignature(VARCHAR))),
|
||||
ClientTypeSignatureParameter.ofNamedType(
|
||||
new NamedClientTypeSignature(
|
||||
Optional.of(new RowFieldName("bar")),
|
||||
mapTypeSignature))).asJava)
|
||||
|
||||
def genRow(value: Int): List[_] = {
|
||||
val boolVal = value % 3 match {
|
||||
case 0 => true
|
||||
@ -61,7 +87,9 @@ class RowSetSuite extends KyuubiFunSuite {
|
||||
val arrVal = Array.fill(value)(doubleVal).toList.asJava
|
||||
val mapVal = Map(value -> doubleVal).asJava
|
||||
val jsonVal = s"""{"$value": $value}"""
|
||||
val rowVal = Row.builder().addField(value.toString, value).build()
|
||||
val rowVal = Row.builder()
|
||||
.addField("", value.toString)
|
||||
.addField("", mapVal).build()
|
||||
val ipVal = s"${value}.${value}.${value}.${value}"
|
||||
val uuidVal = java.util.UUID.fromString(
|
||||
s"$UUID_PREFIX${uuidSuffix(value)}")
|
||||
@ -106,16 +134,20 @@ class RowSetSuite extends KyuubiFunSuite {
|
||||
column("m", VARBINARY),
|
||||
column("n", VARCHAR),
|
||||
column("o", CHAR),
|
||||
column("p", ROW),
|
||||
column("q", ARRAY),
|
||||
column("r", MAP),
|
||||
column("p", ROW, rowTypeSignature),
|
||||
column("q", ARRAY, arrayTypeSignature),
|
||||
column("r", MAP, mapTypeSignature),
|
||||
column("s", JSON),
|
||||
column("t", IPADDRESS),
|
||||
column("u", UUID))
|
||||
|
||||
private val rows: Seq[List[_]] = (0 to 10).map(genRow) ++ Seq(List.fill(21)(null))
|
||||
|
||||
def column(name: String, tp: String): Column = new Column(name, tp, new ClientTypeSignature(tp))
|
||||
def column(name: String, tp: String): Column = column(name, tp, new ClientTypeSignature(tp))
|
||||
|
||||
def column(name: String, tp: String, signature: ClientTypeSignature): Column = {
|
||||
new Column(name, tp, signature)
|
||||
}
|
||||
|
||||
def uuidSuffix(value: Int): String = if (value > 9) value.toString else s"f$value"
|
||||
|
||||
@ -164,7 +196,7 @@ class RowSetSuite extends KyuubiFunSuite {
|
||||
dateCol.getValues.asScala.zipWithIndex.foreach {
|
||||
case (b, 11) => assert(b.isEmpty)
|
||||
case (b, i) =>
|
||||
assert(b === toHiveString((Date.valueOf(s"2018-11-${i + 1}"), DATE)))
|
||||
assert(b === toHiveString(Date.valueOf(s"2018-11-${i + 1}"), new ClientTypeSignature(DATE)))
|
||||
}
|
||||
|
||||
val decCol = cols.next().getStringVal
|
||||
@ -200,7 +232,8 @@ class RowSetSuite extends KyuubiFunSuite {
|
||||
val timeCol = cols.next().getStringVal
|
||||
timeCol.getValues.asScala.zipWithIndex.foreach {
|
||||
case (b, 11) => assert(b.isEmpty)
|
||||
case (b, i) => assert(b === toHiveString((Time.valueOf(s"13:33:${i + 1}"), TIME)))
|
||||
case (b, i) => assert(b ===
|
||||
toHiveString(Time.valueOf(s"13:33:${i + 1}"), new ClientTypeSignature(TIME)))
|
||||
}
|
||||
|
||||
val binCol = cols.next().getBinaryVal
|
||||
@ -224,43 +257,48 @@ class RowSetSuite extends KyuubiFunSuite {
|
||||
val rowCol = cols.next().getStringVal
|
||||
rowCol.getValues.asScala.zipWithIndex.foreach {
|
||||
case (b, 11) => assert(b.isEmpty)
|
||||
case (b, i) => assert(b ===
|
||||
toHiveString((Row.builder().addField(i.toString, i).build(), ROW)))
|
||||
case (b, i) => assert(b === toHiveString(
|
||||
Row.builder().addField("", i.toString).addField(
|
||||
"",
|
||||
Map(i -> java.lang.Double.valueOf(s"$i.$i")).asJava).build(),
|
||||
rowTypeSignature))
|
||||
}
|
||||
|
||||
val arrCol = cols.next().getStringVal
|
||||
arrCol.getValues.asScala.zipWithIndex.foreach {
|
||||
case (b, 11) => assert(b === "")
|
||||
case (b, i) => assert(b === toHiveString(
|
||||
(Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toList.asJava, ARRAY)))
|
||||
Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toList.asJava,
|
||||
arrayTypeSignature))
|
||||
}
|
||||
|
||||
val mapCol = cols.next().getStringVal
|
||||
mapCol.getValues.asScala.zipWithIndex.foreach {
|
||||
case (b, 11) => assert(b === "")
|
||||
case (b, i) => assert(b === toHiveString(
|
||||
(Map(i -> java.lang.Double.valueOf(s"$i.$i")).asJava, MAP)))
|
||||
Map(i -> java.lang.Double.valueOf(s"$i.$i")).asJava,
|
||||
mapTypeSignature))
|
||||
}
|
||||
|
||||
val jsonCol = cols.next().getStringVal
|
||||
jsonCol.getValues.asScala.zipWithIndex.foreach {
|
||||
case (b, 11) => assert(b === "")
|
||||
case (b, i) => assert(b ===
|
||||
toHiveString((s"""{"$i": $i}""", JSON)))
|
||||
toHiveString(s"""{"$i": $i}""", new ClientTypeSignature(JSON)))
|
||||
}
|
||||
|
||||
val ipCol = cols.next().getStringVal
|
||||
ipCol.getValues.asScala.zipWithIndex.foreach {
|
||||
case (b, 11) => assert(b === "")
|
||||
case (b, i) => assert(b ===
|
||||
toHiveString((s"${i}.${i}.${i}.${i}", IPADDRESS)))
|
||||
toHiveString(s"${i}.${i}.${i}.${i}", new ClientTypeSignature(IPADDRESS)))
|
||||
}
|
||||
|
||||
val uuidCol = cols.next().getStringVal
|
||||
uuidCol.getValues.asScala.zipWithIndex.foreach {
|
||||
case (b, 11) => assert(b === "")
|
||||
case (b, i) => assert(b ===
|
||||
toHiveString((s"$UUID_PREFIX${uuidSuffix(i)}", UUID)))
|
||||
toHiveString(s"$UUID_PREFIX${uuidSuffix(i)}", new ClientTypeSignature(UUID)))
|
||||
}
|
||||
}
|
||||
|
||||
@ -308,10 +346,12 @@ class RowSetSuite extends KyuubiFunSuite {
|
||||
assert(r9.get(14).getStringVal.getValue === String.format(s"%10s", 8.toString))
|
||||
|
||||
val r10 = iter.next().getColVals
|
||||
val mapStr =
|
||||
Map(9 -> 9.9d).map { case (key, value) => s"$key:$value" }.toSeq.mkString("{", ",", "}")
|
||||
assert(r10.get(15).getStringVal.getValue ===
|
||||
toHiveString((Row.builder().addField(9.toString, 9).build(), ROW)))
|
||||
String.format("{foo=\"%s\",bar=%s}", "9", mapStr))
|
||||
assert(r10.get(16).getStringVal.getValue === Array.fill(9)(9.9d).mkString("[", ",", "]"))
|
||||
assert(r10.get(17).getStringVal.getValue === toHiveString((Map(9 -> 9.9d).asJava, MAP)))
|
||||
assert(r10.get(17).getStringVal.getValue === mapStr)
|
||||
assert(r10.get(18).getStringVal.getValue === "{\"9\": 9}")
|
||||
assert(r10.get(19).getStringVal.getValue === "9.9.9.9")
|
||||
assert(r10.get(20).getStringVal.getValue === s"$UUID_PREFIX${uuidSuffix(9)}")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user