From d06c656cc648bbcf96d96baaa6a2faac0d7844be Mon Sep 17 00:00:00 2001 From: df_liu Date: Tue, 27 Sep 2022 15:28:37 +0800 Subject: [PATCH] [KYUUBI #3560] Flink SQL engine supports run DDL across versions ### _Why are the changes needed?_ Followup #3230 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3560 from df-Liu/flink_ddl. Closes #3560 0dbdfb3f [df_liu] flink ddl Authored-by: df_liu Signed-off-by: Shaoyun Chen --- .../kyuubi/engine/flink/operation/ExecuteStatement.scala | 9 +++++++-- .../kyuubi/it/flink/operation/FlinkOperationSuite.scala | 8 ++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala index 374f62f18..182e3e3ef 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.flink.api.common.JobID -import org.apache.flink.table.api.ResultKind +import org.apache.flink.table.api.{ResultKind, TableResult} import org.apache.flink.table.client.gateway.TypedResult import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData} import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData} @@ -154,7 +154,12 @@ class ExecuteStatement( } private def runOperation(operation: Operation): Unit = { - val result = executor.executeOperation(sessionId, operation) + // FLINK-24461 executeOperation method changes the return type + // from TableResult to TableResultInternal + val executeOperation = DynMethods.builder("executeOperation") + .impl(executor.getClass, classOf[String], classOf[Operation]) + .build(executor) + val result = executeOperation.invoke[TableResult](sessionId, operation) jobId = result.getJobClient.asScala.map(_.getJobID) result.await() resultSet = ResultSet.fromTableResult(result) diff --git a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala index b33669702..d4902d461 100644 --- a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala +++ b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala @@ -48,6 +48,14 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster } } + test("execute statement - create/alter/drop table") { + withJdbcStatement()({ statement => + statement.executeQuery("create table tbl_a (a string) with ('connector' = 'blackhole')") + assert(statement.execute("alter table tbl_a rename to tbl_b")) + assert(statement.execute("drop table tbl_b")) + }) + } + test("execute statement - select column name with dots") { withJdbcStatement() { statement => val resultSet = statement.executeQuery("select 'tmp.hello'")