[KYUUBI #3560] Flink SQL engine supports run DDL across versions

### _Why are the changes needed?_

Followup #3230

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3560 from df-Liu/flink_ddl.

Closes #3560

0dbdfb3f [df_liu] flink ddl

Authored-by: df_liu <df_liu@trip.com>
Signed-off-by: Shaoyun Chen <csy@apache.org>
This commit is contained in:
df_liu 2022-09-27 15:28:37 +08:00 committed by Shaoyun Chen
parent ef55e4a7d2
commit d06c656cc6
No known key found for this signature in database
GPG Key ID: 1E17EF9E7F8E83AA
2 changed files with 15 additions and 2 deletions

View File

@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.flink.api.common.JobID
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.api.{ResultKind, TableResult}
import org.apache.flink.table.client.gateway.TypedResult
import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData}
@ -154,7 +154,12 @@ class ExecuteStatement(
}
private def runOperation(operation: Operation): Unit = {
val result = executor.executeOperation(sessionId, operation)
// FLINK-24461 executeOperation method changes the return type
// from TableResult to TableResultInternal
val executeOperation = DynMethods.builder("executeOperation")
.impl(executor.getClass, classOf[String], classOf[Operation])
.build(executor)
val result = executeOperation.invoke[TableResult](sessionId, operation)
jobId = result.getJobClient.asScala.map(_.getJobID)
result.await()
resultSet = ResultSet.fromTableResult(result)

View File

@ -48,6 +48,14 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster
}
}
test("execute statement - create/alter/drop table") {
withJdbcStatement()({ statement =>
statement.executeQuery("create table tbl_a (a string) with ('connector' = 'blackhole')")
assert(statement.execute("alter table tbl_a rename to tbl_b"))
assert(statement.execute("drop table tbl_b"))
})
}
test("execute statement - select column name with dots") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select 'tmp.hello'")