From c06e042287deb12b9230c22a2c8e56414545fd11 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Wed, 19 Jan 2022 16:49:33 +0800 Subject: [PATCH] [KYUUBI #1784][FOLLOWUP][FLINK] Support float type ### _Why are the changes needed?_ ```sql SELECT cast(0.1 as float) ``` Before this PR, this query will throw an exception when we ran it with the Flink engine.
Exception message ``` org.apache.kyuubi.shade.org.apache.thrift.transport.TTransportException at org.apache.kyuubi.shade.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) at org.apache.kyuubi.shade.org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:376) at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:453) at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:435) at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslClientTransport.read(TSaslClientTransport.java:37) at org.apache.kyuubi.shade.org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) at org.apache.kyuubi.shade.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Client.recv_FetchResults(TCLIService.java:559) at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Client.FetchResults(TCLIService.java:546) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.kyuubi.jdbc.hive.KyuubiConnection$SynchronizedHandler.invoke(KyuubiConnection.java:1611) at com.sun.proxy.$Proxy20.FetchResults(Unknown Source) at org.apache.kyuubi.jdbc.hive.KyuubiQueryResultSet.next(KyuubiQueryResultSet.java:367) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.$anonfun$new$5(FlinkOperationSuite.scala:53) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.$anonfun$new$5$adapted(FlinkOperationSuite.scala:50) at org.apache.kyuubi.operation.JDBCTestHelper.$anonfun$withMultipleConnectionJdbcStatement$3(JDBCTestHelper.scala:60) at org.apache.kyuubi.operation.JDBCTestHelper.$anonfun$withMultipleConnectionJdbcStatement$3$adapted(JDBCTestHelper.scala:60) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.kyuubi.operation.JDBCTestHelper.withMultipleConnectionJdbcStatement(JDBCTestHelper.scala:60) at org.apache.kyuubi.operation.JDBCTestHelper.withMultipleConnectionJdbcStatement$(JDBCTestHelper.scala:54) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.withMultipleConnectionJdbcStatement(FlinkOperationSuite.scala:34) at org.apache.kyuubi.operation.JDBCTestHelper.withJdbcStatement(JDBCTestHelper.scala:98) at org.apache.kyuubi.operation.JDBCTestHelper.withJdbcStatement$(JDBCTestHelper.scala:97) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.withJdbcStatement(FlinkOperationSuite.scala:34) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.$anonfun$new$4(FlinkOperationSuite.scala:50) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) at org.apache.kyuubi.KyuubiFunSuite.withFixture(KyuubiFunSuite.scala:61) at org.apache.kyuubi.KyuubiFunSuite.withFixture$(KyuubiFunSuite.scala:55) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.withFixture(FlinkOperationSuite.scala:34) at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(FlinkOperationSuite.scala:34) at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.runTest(FlinkOperationSuite.scala:34) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) at scala.collection.immutable.List.foreach(List.scala:431) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269) at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268) at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563) at org.scalatest.Suite.run(Suite.scala:1112) at org.scalatest.Suite.run$(Suite.scala:1094) at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273) at org.scalatest.SuperEngine.runImpl(Engine.scala:535) at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273) at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.org$scalatest$BeforeAndAfterAll$$super$run(FlinkOperationSuite.scala:34) at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) at org.apache.kyuubi.engine.flink.operation.FlinkOperationSuite.run(FlinkOperationSuite.scala:34) at org.scalatest.Suite.callExecuteOnSuite$1(Suite.scala:1175) at org.scalatest.Suite.$anonfun$runNestedSuites$1(Suite.scala:1222) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.scalatest.Suite.runNestedSuites(Suite.scala:1220) at org.scalatest.Suite.runNestedSuites$(Suite.scala:1154) at org.scalatest.tools.DiscoverySuite.runNestedSuites(DiscoverySuite.scala:30) at org.scalatest.Suite.run(Suite.scala:1109) at org.scalatest.Suite.run$(Suite.scala:1094) at org.scalatest.tools.DiscoverySuite.run(DiscoverySuite.scala:30) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1322) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1316) at scala.collection.immutable.List.foreach(List.scala:431) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1316) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1482) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971) at org.scalatest.tools.Runner$.main(Runner.scala:775) ```
### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1801 from cfmcgrady/kyuubi-1784-flink. Closes #1784 17623e89 [Fu Chen] fix style 6385c417 [Fu Chen] fix Authored-by: Fu Chen Signed-off-by: Cheng Pan --- .../org/apache/kyuubi/engine/flink/schema/RowSet.scala | 6 ++++-- .../engine/flink/operation/FlinkOperationSuite.scala | 9 +++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala index 5f2baf1a6..8b0db93f2 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala @@ -112,7 +112,8 @@ object RowSet { case _: FloatType => val tDoubleValue = new TDoubleValue if (row.getField(ordinal) != null) { - tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Float]) + val doubleValue = lang.Double.valueOf(row.getField(ordinal).asInstanceOf[Float].toString) + tDoubleValue.setValue(doubleValue) } TColumnValue.doubleVal(tDoubleValue) case _: DoubleType => @@ -167,7 +168,8 @@ object RowSet { val values = getOrSetAsNull[lang.Long](rows, ordinal, nulls, 0L) TColumn.i64Val(new TI64Column(values, nulls)) case _: FloatType => - val values = getOrSetAsNull[lang.Double](rows, ordinal, nulls, 0.0) + val values = getOrSetAsNull[lang.Float](rows, ordinal, nulls, 0.0f) + .asScala.map(n => lang.Double.valueOf(n.toString)).asJava TColumn.doubleVal(new TDoubleColumn(values, nulls)) case _: DoubleType => val values = getOrSetAsNull[lang.Double](rows, ordinal, nulls, 0.0) diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala index 0ca437c89..4e472bc2f 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala @@ -593,6 +593,15 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper { } } + test("execute statement - select float") { + withJdbcStatement()({ statement => + val resultSet = statement.executeQuery("SELECT cast(0.1 as float)") + assert(resultSet.next()) + assert(resultSet.getString(1) == "0.1") + assert(resultSet.getFloat(1) == 0.1f) + }) + } + test("execute statement - show functions") { withJdbcStatement() { statement => val resultSet = statement.executeQuery("show functions")