From dcb444e2d8e44b30eb6cad74ae8b75b2e54b52da Mon Sep 17 00:00:00 2001 From: Paul Lin Date: Wed, 3 May 2023 21:51:21 +0800 Subject: [PATCH] [KYUUBI #4495] Support Flink job management statements ### _Why are the changes needed?_ Support Flink job management statements. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4774 from link3280/KYUUBI-4495. Closes #4495 a4aaebcbb [Paul Lin] [KYUUBI #4495] Adjust the order of tests 225a6cdbd [Paul Lin] [KYUUBI #4495] Increase the number of taskmanagers in the mini cluster 67935ac24 [Paul Lin] [KYUUBI #4495] Wait jobs to get ready for show job statements 9c4ce1d6e [Paul Lin] [KYUUBI #4495] Fix show jobs assertion error ab3113cab [Paul Lin] [KYUUBI #4495] Support Flink job management statements Authored-by: Paul Lin Signed-off-by: Cheng Pan --- .../flink/WithFlinkSQLEngineLocal.scala | 1 + .../engine/flink/WithFlinkTestResources.scala | 5 +- .../flink/operation/FlinkOperationSuite.scala | 70 ++++++++++++++++++- 3 files changed, 73 insertions(+), 3 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala index 0001f31ae..92c1bcd83 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala @@ -184,6 +184,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources val cfg = new MiniClusterConfiguration.Builder() .setConfiguration(flinkConfig) .setNumSlotsPerTaskManager(1) + .setNumTaskManagers(2) .build miniCluster = new MiniCluster(cfg) miniCluster.start() diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala index 3ea02774e..3b1d65cb2 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala @@ -41,6 +41,9 @@ trait WithFlinkTestResources { GENERATED_UDF_CLASS, GENERATED_UDF_CODE) + protected val savepointDir: File = Utils.createTempDir("savepoints").toFile + protected val testExtraConf: Map[String, String] = Map( - "flink.pipeline.name" -> "test-job") + "flink.pipeline.name" -> "test-job", + "flink.state.savepoints.dir" -> savepointDir.toURI.toString) } diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala index 00e26c528..39d17aa7b 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.engine.flink.operation +import java.nio.file.Paths import java.sql.DatabaseMetaData import java.util.UUID @@ -33,6 +34,7 @@ import org.apache.kyuubi.engine.flink.{FlinkEngineUtils, WithFlinkTestResources} import org.apache.kyuubi.engine.flink.result.Constants import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar import org.apache.kyuubi.jdbc.hive.KyuubiStatement +import org.apache.kyuubi.jdbc.hive.common.TimestampTZ import org.apache.kyuubi.operation.HiveJDBCTestHelper import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ @@ -632,6 +634,62 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest } } + test("execute statement - show/stop jobs") { + if (FlinkEngineUtils.isFlinkVersionAtLeast("1.17")) { + withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "10"))(Map.empty) { + withMultipleConnectionJdbcStatement()({ statement => + statement.executeQuery( + "create table tbl_a (a int) with (" + + "'connector' = 'datagen', " + + "'rows-per-second'='10')") + statement.executeQuery("create table tbl_b (a int) with ('connector' = 'blackhole')") + val insertResult1 = statement.executeQuery("insert into tbl_b select * from tbl_a") + assert(insertResult1.next()) + val jobId1 = insertResult1.getString(1) + + Thread.sleep(5000) + + val showResult = statement.executeQuery("show jobs") + val metadata = showResult.getMetaData + assert(metadata.getColumnName(1) === "job id") + assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR) + assert(metadata.getColumnName(2) === "job name") + assert(metadata.getColumnType(2) === java.sql.Types.VARCHAR) + assert(metadata.getColumnName(3) === "status") + assert(metadata.getColumnType(3) === java.sql.Types.VARCHAR) + assert(metadata.getColumnName(4) === "start time") + assert(metadata.getColumnType(4) === java.sql.Types.OTHER) + + var isFound = false + while (showResult.next()) { + if (showResult.getString(1) === jobId1) { + isFound = true + assert(showResult.getString(2) === "test-job") + assert(showResult.getString(3) === "RUNNING") + assert(showResult.getObject(4).isInstanceOf[TimestampTZ]) + } + } + assert(isFound) + + val stopResult1 = statement.executeQuery(s"stop job '$jobId1'") + assert(stopResult1.next()) + assert(stopResult1.getString(1) === "OK") + + val selectResult = statement.executeQuery("select * from tbl_a") + val jobId2 = statement.asInstanceOf[KyuubiStatement].getQueryId + assert(jobId2 !== null) + while (!selectResult.next()) { + Thread.sleep(1000L) + } + val stopResult2 = statement.executeQuery(s"stop job '$jobId2' with savepoint") + assert(stopResult2.getMetaData.getColumnName(1).equals("savepoint path")) + assert(stopResult2.next()) + assert(Paths.get(stopResult2.getString(1)).getFileName.toString.startsWith("savepoint-")) + }) + } + } + } + test("execute statement - select column name with dots") { withJdbcStatement() { statement => val resultSet = statement.executeQuery("select 'tmp.hello'") @@ -994,7 +1052,14 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest assert(metadata.getColumnName(1) === "job id") assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR) assert(resultSet.next()) - assert(resultSet.getString(1).length == 32) + val jobId = resultSet.getString(1) + assert(jobId.length == 32) + + if (FlinkEngineUtils.isFlinkVersionAtLeast("1.17")) { + val stopResult = statement.executeQuery(s"stop job '$jobId'") + assert(stopResult.next()) + assert(stopResult.getString(1) === "OK") + } }) } @@ -1072,7 +1137,8 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest test("ensure result max rows") { withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "200"))(Map.empty) { withJdbcStatement() { statement => - statement.execute("create table tbl_src (a bigint) with ('connector' = 'datagen')") + statement.execute("create table tbl_src (a bigint) with (" + + "'connector' = 'datagen', 'number-of-rows' = '1000')") val resultSet = statement.executeQuery(s"select a from tbl_src") var rows = 0 while (resultSet.next()) {