From ef8de3723e91f907b363bd12d9d99596175123d2 Mon Sep 17 00:00:00 2001 From: Paul Lin Date: Tue, 25 Jan 2022 10:06:53 +0800 Subject: [PATCH] [KYUUBI #1778] Support Flink Set/Reset Operations ### _Why are the changes needed?_ Set/Reset operations are crucial for tunning Flink SQL jobs and enabling/disabling features, but they're not executed as SQL statements in Flink, thus can't be supported by the current ExecuteStatement implementation. We should extend ExecuteStatement to support these operations. This is a sub-task of KPIP-2 #1322. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1822 from link3280/feature/KYUUBI-1778. Closes #1778 80b2de0a [Paul Lin] [KYUUBI #1825] Return key-value properties in two separate columns 786f3c58 [Paul Lin] [KYUUBI #1778] Support Flink reset operations 6513a317 [Paul Lin] [KYUUBI #1778] Support Flink set operations Authored-by: Paul Lin Signed-off-by: Kent Yao --- .../engine/flink/result/OperationUtil.java | 15 +++++ .../flink/operation/ExecuteStatement.scala | 67 ++++++++++++++++++- .../flink/operation/FlinkOperationSuite.scala | 39 +++++++++++ 3 files changed, 119 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/OperationUtil.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/OperationUtil.java index 8620ab787..b2d66351a 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/OperationUtil.java +++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/OperationUtil.java @@ -55,4 +55,19 @@ public class OperationUtil { .data(data.toArray(new Row[0])) .build(); } + + /** + * Build a simple result with OK message. Returned when SQL commands are executed successfully. + * Noted that a new ResultSet is returned each time, because ResultSet is stateful (with its + * cursor). + * + * @return A simple result with OK message. + */ + public static ResultSet successResultSet() { + return ResultSet.builder() + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .columns(Column.physical("result", DataTypes.STRING())) + .data(new Row[] {Row.of("OK")}) + .build(); + } } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala index f710b5663..85decf64c 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala @@ -17,19 +17,22 @@ package org.apache.kyuubi.engine.flink.operation +import java.util import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import com.google.common.annotations.VisibleForTesting -import org.apache.flink.table.api.ResultKind +import org.apache.flink.table.api.{DataTypes, ResultKind} +import org.apache.flink.table.catalog.Column import org.apache.flink.table.client.gateway.{Executor, TypedResult} import org.apache.flink.table.operations.{Operation, QueryOperation} +import org.apache.flink.table.operations.command.{ResetOperation, SetOperation} import org.apache.flink.types.Row import org.apache.kyuubi.{KyuubiSQLException, Logging} -import org.apache.kyuubi.engine.flink.result.ResultSet +import org.apache.kyuubi.engine.flink.result.{OperationUtil, ResultSet} import org.apache.kyuubi.operation.{OperationState, OperationType} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -100,6 +103,8 @@ class ExecuteStatement( val operation = executor.parseStatement(sessionId, statement) operation match { case queryOperation: QueryOperation => runQueryOperation(queryOperation) + case setOperation: SetOperation => runSetOperation(setOperation) + case resetOperation: ResetOperation => runResetOperation(resetOperation) case operation: Operation => runOperation(operation) } } catch { @@ -139,6 +144,64 @@ class ExecuteStatement( setState(OperationState.FINISHED) } + private def runSetOperation(setOperation: SetOperation): Unit = { + if (setOperation.getKey.isPresent) { + val key: String = setOperation.getKey.get.trim + + if (setOperation.getValue.isPresent) { + val newValue: String = setOperation.getValue.get.trim + executor.setSessionProperty(sessionId, key, newValue) + } + + val value = executor.getSessionConfigMap(sessionId).getOrDefault(key, "") + resultSet = ResultSet.builder + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .columns( + Column.physical("key", DataTypes.STRING()), + Column.physical("value", DataTypes.STRING())) + .data(Array(Row.of(key, value))) + .build + } else { + // show all properties if set without key + val properties: util.Map[String, String] = executor.getSessionConfigMap(sessionId) + + val entries = ArrayBuffer.empty[Row] + properties.forEach((key, value) => entries.append(Row.of(key, value))) + + if (entries.nonEmpty) { + val prettyEntries = entries.sortBy(_.getField(0).asInstanceOf[String]) + resultSet = ResultSet.builder + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .columns( + Column.physical("key", DataTypes.STRING()), + Column.physical("value", DataTypes.STRING())) + .data(prettyEntries.toArray) + .build + } else { + resultSet = ResultSet.builder + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .columns( + Column.physical("key", DataTypes.STRING()), + Column.physical("value", DataTypes.STRING())) + .data(Array[Row]()) + .build + } + } + setState(OperationState.FINISHED) + } + + private def runResetOperation(resetOperation: ResetOperation): Unit = { + if (resetOperation.getKey.isPresent) { + // reset the given property + executor.resetSessionProperty(sessionId, resetOperation.getKey.get()) + } else { + // reset all properties + executor.resetSessionProperties(sessionId) + } + resultSet = OperationUtil.successResultSet() + setState(OperationState.FINISHED) + } + private def runOperation(operation: Operation): Unit = { val result = executor.executeOperation(sessionId, operation) result.await() diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala index 4e472bc2f..ac3f009e4 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala @@ -685,4 +685,43 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper { assert(resultSet.getLong(1) == -1L) }) } + + test("execute statement - set properties") { + withMultipleConnectionJdbcStatement()({ statement => + val resultSet = statement.executeQuery("set table.dynamic-table-options.enabled = true") + val metadata = resultSet.getMetaData + assert(metadata.getColumnName(1) == "key") + assert(metadata.getColumnName(2) == "value") + assert(resultSet.next()) + assert(resultSet.getString(1) == "table.dynamic-table-options.enabled") + assert(resultSet.getString(2) == "true") + }) + } + + test("execute statement - show properties") { + withMultipleConnectionJdbcStatement()({ statement => + val resultSet = statement.executeQuery("set") + val metadata = resultSet.getMetaData + assert(metadata.getColumnName(1) == "key") + assert(metadata.getColumnName(2) == "value") + assert(resultSet.next()) + }) + } + + test("execute statement - reset property") { + withMultipleConnectionJdbcStatement()({ statement => + statement.executeQuery("set pipeline.jars = my.jar") + statement.executeQuery("reset pipeline.jars") + val resultSet = statement.executeQuery("set") + // Flink does not support set key without value currently, + // thus read all rows to find the desired one + var success = false + while (resultSet.next()) { + if (resultSet.getString(1) == "pipeline.jars" && resultSet.getString(2) == "") { + success = true + } + } + assert(success) + }) + } }