[KYUUBI #1784][FOLLOWUP][FLINK] Support float type

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

```sql
SELECT cast(0.1 as float)
```

Before this PR, this query will throw an exception when we ran it with the Flink engine.

<details>
  <summary>Exception message</summary>

```
org.apache.kyuubi.shade.org.apache.thrift.transport.TTransportException
        at org.apache.kyuubi.shade.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
        at org.apache.kyuubi.shade.org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
        at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:376)
        at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:453)
        at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:435)
        at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslClientTransport.read(TSaslClientTransport.java:37)
        at org.apache.kyuubi.shade.org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
        at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
        at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
        at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
        at org.apache.kyuubi.shade.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
        at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Client.recv_FetchResults(TCLIService.java:559)
        at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Client.FetchResults(TCLIService.java:546)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.kyuubi.jdbc.hive.KyuubiConnection$SynchronizedHandler.invoke(KyuubiConnection.java:1611)
        at com.sun.proxy.$Proxy20.FetchResults(Unknown Source)
        at org.apache.kyuubi.jdbc.hive.KyuubiQueryResultSet.next(KyuubiQueryResultSet.java:367)
        at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.$anonfun$new$5(FlinkOperationSuite.scala:53)
        at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.$anonfun$new$5$adapted(FlinkOperationSuite.scala:50)
        at org.apache.kyuubi.operation.JDBCTestHelper.$anonfun$withMultipleConnectionJdbcStatement$3(JDBCTestHelper.scala:60)
        at org.apache.kyuubi.operation.JDBCTestHelper.$anonfun$withMultipleConnectionJdbcStatement$3$adapted(JDBCTestHelper.scala:60)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.kyuubi.operation.JDBCTestHelper.withMultipleConnectionJdbcStatement(JDBCTestHelper.scala:60)
        at org.apache.kyuubi.operation.JDBCTestHelper.withMultipleConnectionJdbcStatement$(JDBCTestHelper.scala:54)
        at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.withMultipleConnectionJdbcStatement(FlinkOperationSuite.scala:34)
        at org.apache.kyuubi.operation.JDBCTestHelper.withJdbcStatement(JDBCTestHelper.scala:98)
        at org.apache.kyuubi.operation.JDBCTestHelper.withJdbcStatement$(JDBCTestHelper.scala:97)
        at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.withJdbcStatement(FlinkOperationSuite.scala:34)
        at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.$anonfun$new$4(FlinkOperationSuite.scala:50)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
        at org.apache.kyuubi.KyuubiFunSuite.withFixture(KyuubiFunSuite.scala:61)
        at org.apache.kyuubi.KyuubiFunSuite.withFixture$(KyuubiFunSuite.scala:55)
        at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.withFixture(FlinkOperationSuite.scala:34)
        at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
        at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(FlinkOperationSuite.scala:34)
        at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
        at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
        at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.runTest(FlinkOperationSuite.scala:34)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
        at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
        at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
        at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563)
        at org.scalatest.Suite.run(Suite.scala:1112)
        at org.scalatest.Suite.run$(Suite.scala:1094)
        at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
        at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
        at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
        at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.org$scalatest$BeforeAndAfterAll$$super$run(FlinkOperationSuite.scala:34)
        at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
        at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
        at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
        at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.run(FlinkOperationSuite.scala:34)
        at org.scalatest.Suite.callExecuteOnSuite$1(Suite.scala:1175)
        at org.scalatest.Suite.$anonfun$runNestedSuites$1(Suite.scala:1222)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at org.scalatest.Suite.runNestedSuites(Suite.scala:1220)
        at org.scalatest.Suite.runNestedSuites$(Suite.scala:1154)
        at org.scalatest.tools.DiscoverySuite.runNestedSuites(DiscoverySuite.scala:30)
        at org.scalatest.Suite.run(Suite.scala:1109)
        at org.scalatest.Suite.run$(Suite.scala:1094)
        at org.scalatest.tools.DiscoverySuite.run(DiscoverySuite.scala:30)
        at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
        at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1322)
        at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1316)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1316)
        at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
        at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
        at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1482)
        at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
        at org.scalatest.tools.Runner$.main(Runner.scala:775)
```

 </details>

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1801 from cfmcgrady/kyuubi-1784-flink.

Closes #1784

17623e89 [Fu Chen] fix style
6385c417 [Fu Chen] fix

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Fu Chen 2022-01-19 16:49:33 +08:00 committed by Cheng Pan
parent c65df0fa36
commit c06e042287
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
2 changed files with 13 additions and 2 deletions

View File

@ -112,7 +112,8 @@ object RowSet {
case _: FloatType =>
val tDoubleValue = new TDoubleValue
if (row.getField(ordinal) != null) {
tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Float])
val doubleValue = lang.Double.valueOf(row.getField(ordinal).asInstanceOf[Float].toString)
tDoubleValue.setValue(doubleValue)
}
TColumnValue.doubleVal(tDoubleValue)
case _: DoubleType =>
@ -167,7 +168,8 @@ object RowSet {
val values = getOrSetAsNull[lang.Long](rows, ordinal, nulls, 0L)
TColumn.i64Val(new TI64Column(values, nulls))
case _: FloatType =>
val values = getOrSetAsNull[lang.Double](rows, ordinal, nulls, 0.0)
val values = getOrSetAsNull[lang.Float](rows, ordinal, nulls, 0.0f)
.asScala.map(n => lang.Double.valueOf(n.toString)).asJava
TColumn.doubleVal(new TDoubleColumn(values, nulls))
case _: DoubleType =>
val values = getOrSetAsNull[lang.Double](rows, ordinal, nulls, 0.0)

View File

@ -593,6 +593,15 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}
}
test("execute statement - select float") {
withJdbcStatement()({ statement =>
val resultSet = statement.executeQuery("SELECT cast(0.1 as float)")
assert(resultSet.next())
assert(resultSet.getString(1) == "0.1")
assert(resultSet.getFloat(1) == 0.1f)
})
}
test("execute statement - show functions") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("show functions")