[KYUUBI #4495] Support Flink job management statements

### _Why are the changes needed?_

Support Flink job management statements.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4774 from link3280/KYUUBI-4495.

Closes #4495

a4aaebcbb [Paul Lin] [KYUUBI #4495] Adjust the order of tests
225a6cdbd [Paul Lin] [KYUUBI #4495] Increase the number of taskmanagers in the mini cluster
67935ac24 [Paul Lin] [KYUUBI #4495] Wait jobs to get ready for show job statements
9c4ce1d6e [Paul Lin] [KYUUBI #4495] Fix show jobs assertion error
ab3113cab [Paul Lin] [KYUUBI #4495] Support Flink job management statements

Authored-by: Paul Lin <paullin3280@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Paul Lin 2023-05-03 21:51:21 +08:00 committed by Cheng Pan
parent 66de0ad8a0
commit dcb444e2d8
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
3 changed files with 73 additions and 3 deletions

View File

@ -184,6 +184,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources
val cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(flinkConfig)
.setNumSlotsPerTaskManager(1)
.setNumTaskManagers(2)
.build
miniCluster = new MiniCluster(cfg)
miniCluster.start()

View File

@ -41,6 +41,9 @@ trait WithFlinkTestResources {
GENERATED_UDF_CLASS,
GENERATED_UDF_CODE)
protected val savepointDir: File = Utils.createTempDir("savepoints").toFile
protected val testExtraConf: Map[String, String] = Map(
"flink.pipeline.name" -> "test-job")
"flink.pipeline.name" -> "test-job",
"flink.state.savepoints.dir" -> savepointDir.toURI.toString)
}

View File

@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.flink.operation
import java.nio.file.Paths
import java.sql.DatabaseMetaData
import java.util.UUID
@ -33,6 +34,7 @@ import org.apache.kyuubi.engine.flink.{FlinkEngineUtils, WithFlinkTestResources}
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
import org.apache.kyuubi.jdbc.hive.common.TimestampTZ
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
@ -632,6 +634,62 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
}
}
test("execute statement - show/stop jobs") {
if (FlinkEngineUtils.isFlinkVersionAtLeast("1.17")) {
withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "10"))(Map.empty) {
withMultipleConnectionJdbcStatement()({ statement =>
statement.executeQuery(
"create table tbl_a (a int) with (" +
"'connector' = 'datagen', " +
"'rows-per-second'='10')")
statement.executeQuery("create table tbl_b (a int) with ('connector' = 'blackhole')")
val insertResult1 = statement.executeQuery("insert into tbl_b select * from tbl_a")
assert(insertResult1.next())
val jobId1 = insertResult1.getString(1)
Thread.sleep(5000)
val showResult = statement.executeQuery("show jobs")
val metadata = showResult.getMetaData
assert(metadata.getColumnName(1) === "job id")
assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR)
assert(metadata.getColumnName(2) === "job name")
assert(metadata.getColumnType(2) === java.sql.Types.VARCHAR)
assert(metadata.getColumnName(3) === "status")
assert(metadata.getColumnType(3) === java.sql.Types.VARCHAR)
assert(metadata.getColumnName(4) === "start time")
assert(metadata.getColumnType(4) === java.sql.Types.OTHER)
var isFound = false
while (showResult.next()) {
if (showResult.getString(1) === jobId1) {
isFound = true
assert(showResult.getString(2) === "test-job")
assert(showResult.getString(3) === "RUNNING")
assert(showResult.getObject(4).isInstanceOf[TimestampTZ])
}
}
assert(isFound)
val stopResult1 = statement.executeQuery(s"stop job '$jobId1'")
assert(stopResult1.next())
assert(stopResult1.getString(1) === "OK")
val selectResult = statement.executeQuery("select * from tbl_a")
val jobId2 = statement.asInstanceOf[KyuubiStatement].getQueryId
assert(jobId2 !== null)
while (!selectResult.next()) {
Thread.sleep(1000L)
}
val stopResult2 = statement.executeQuery(s"stop job '$jobId2' with savepoint")
assert(stopResult2.getMetaData.getColumnName(1).equals("savepoint path"))
assert(stopResult2.next())
assert(Paths.get(stopResult2.getString(1)).getFileName.toString.startsWith("savepoint-"))
})
}
}
}
test("execute statement - select column name with dots") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select 'tmp.hello'")
@ -994,7 +1052,14 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
assert(metadata.getColumnName(1) === "job id")
assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR)
assert(resultSet.next())
assert(resultSet.getString(1).length == 32)
val jobId = resultSet.getString(1)
assert(jobId.length == 32)
if (FlinkEngineUtils.isFlinkVersionAtLeast("1.17")) {
val stopResult = statement.executeQuery(s"stop job '$jobId'")
assert(stopResult.next())
assert(stopResult.getString(1) === "OK")
}
})
}
@ -1072,7 +1137,8 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
test("ensure result max rows") {
withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "200"))(Map.empty) {
withJdbcStatement() { statement =>
statement.execute("create table tbl_src (a bigint) with ('connector' = 'datagen')")
statement.execute("create table tbl_src (a bigint) with (" +
"'connector' = 'datagen', 'number-of-rows' = '1000')")
val resultSet = statement.executeQuery(s"select a from tbl_src")
var rows = 0
while (resultSet.next()) {