diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala index 374f62f18..182e3e3ef 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.flink.api.common.JobID -import org.apache.flink.table.api.ResultKind +import org.apache.flink.table.api.{ResultKind, TableResult} import org.apache.flink.table.client.gateway.TypedResult import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData} import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData} @@ -154,7 +154,12 @@ class ExecuteStatement( } private def runOperation(operation: Operation): Unit = { - val result = executor.executeOperation(sessionId, operation) + // FLINK-24461 executeOperation method changes the return type + // from TableResult to TableResultInternal + val executeOperation = DynMethods.builder("executeOperation") + .impl(executor.getClass, classOf[String], classOf[Operation]) + .build(executor) + val result = executeOperation.invoke[TableResult](sessionId, operation) jobId = result.getJobClient.asScala.map(_.getJobID) result.await() resultSet = ResultSet.fromTableResult(result) diff --git a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala index b33669702..d4902d461 100644 --- a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala +++ b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala @@ -48,6 +48,14 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster } } + test("execute statement - create/alter/drop table") { + withJdbcStatement()({ statement => + statement.executeQuery("create table tbl_a (a string) with ('connector' = 'blackhole')") + assert(statement.execute("alter table tbl_a rename to tbl_b")) + assert(statement.execute("drop table tbl_b")) + }) + } + test("execute statement - select column name with dots") { withJdbcStatement() { statement => val resultSet = statement.executeQuery("select 'tmp.hello'")