[KYUUBI #1778] Support Flink Set/Reset Operations

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_

Set/Reset operations are crucial for tunning Flink SQL jobs and enabling/disabling features, but they're not executed as SQL statements in Flink, thus can't be supported by the current ExecuteStatement implementation. We should extend ExecuteStatement to support these operations.

This is a sub-task of KPIP-2 #1322.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1822 from link3280/feature/KYUUBI-1778.

Closes #1778

80b2de0a [Paul Lin] [KYUUBI #1825] Return key-value properties in two separate columns
786f3c58 [Paul Lin] [KYUUBI #1778] Support Flink reset operations
6513a317 [Paul Lin] [KYUUBI #1778] Support Flink set operations

Authored-by: Paul Lin <paullin3280@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
Paul Lin 2022-01-25 10:06:53 +08:00 committed by Kent Yao
parent b47e1cdc8b
commit ef8de3723e
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
3 changed files with 119 additions and 2 deletions

View File

@ -55,4 +55,19 @@ public class OperationUtil {
.data(data.toArray(new Row[0]))
.build();
}
/**
* Build a simple result with OK message. Returned when SQL commands are executed successfully.
* Noted that a new ResultSet is returned each time, because ResultSet is stateful (with its
* cursor).
*
* @return A simple result with OK message.
*/
public static ResultSet successResultSet() {
return ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(Column.physical("result", DataTypes.STRING()))
.data(new Row[] {Row.of("OK")})
.build();
}
}

View File

@ -17,19 +17,22 @@
package org.apache.kyuubi.engine.flink.operation
import java.util
import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import com.google.common.annotations.VisibleForTesting
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.api.{DataTypes, ResultKind}
import org.apache.flink.table.catalog.Column
import org.apache.flink.table.client.gateway.{Executor, TypedResult}
import org.apache.flink.table.operations.{Operation, QueryOperation}
import org.apache.flink.table.operations.command.{ResetOperation, SetOperation}
import org.apache.flink.types.Row
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.engine.flink.result.{OperationUtil, ResultSet}
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@ -100,6 +103,8 @@ class ExecuteStatement(
val operation = executor.parseStatement(sessionId, statement)
operation match {
case queryOperation: QueryOperation => runQueryOperation(queryOperation)
case setOperation: SetOperation => runSetOperation(setOperation)
case resetOperation: ResetOperation => runResetOperation(resetOperation)
case operation: Operation => runOperation(operation)
}
} catch {
@ -139,6 +144,64 @@ class ExecuteStatement(
setState(OperationState.FINISHED)
}
private def runSetOperation(setOperation: SetOperation): Unit = {
if (setOperation.getKey.isPresent) {
val key: String = setOperation.getKey.get.trim
if (setOperation.getValue.isPresent) {
val newValue: String = setOperation.getValue.get.trim
executor.setSessionProperty(sessionId, key, newValue)
}
val value = executor.getSessionConfigMap(sessionId).getOrDefault(key, "")
resultSet = ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(
Column.physical("key", DataTypes.STRING()),
Column.physical("value", DataTypes.STRING()))
.data(Array(Row.of(key, value)))
.build
} else {
// show all properties if set without key
val properties: util.Map[String, String] = executor.getSessionConfigMap(sessionId)
val entries = ArrayBuffer.empty[Row]
properties.forEach((key, value) => entries.append(Row.of(key, value)))
if (entries.nonEmpty) {
val prettyEntries = entries.sortBy(_.getField(0).asInstanceOf[String])
resultSet = ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(
Column.physical("key", DataTypes.STRING()),
Column.physical("value", DataTypes.STRING()))
.data(prettyEntries.toArray)
.build
} else {
resultSet = ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(
Column.physical("key", DataTypes.STRING()),
Column.physical("value", DataTypes.STRING()))
.data(Array[Row]())
.build
}
}
setState(OperationState.FINISHED)
}
private def runResetOperation(resetOperation: ResetOperation): Unit = {
if (resetOperation.getKey.isPresent) {
// reset the given property
executor.resetSessionProperty(sessionId, resetOperation.getKey.get())
} else {
// reset all properties
executor.resetSessionProperties(sessionId)
}
resultSet = OperationUtil.successResultSet()
setState(OperationState.FINISHED)
}
private def runOperation(operation: Operation): Unit = {
val result = executor.executeOperation(sessionId, operation)
result.await()

View File

@ -685,4 +685,43 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
assert(resultSet.getLong(1) == -1L)
})
}
test("execute statement - set properties") {
withMultipleConnectionJdbcStatement()({ statement =>
val resultSet = statement.executeQuery("set table.dynamic-table-options.enabled = true")
val metadata = resultSet.getMetaData
assert(metadata.getColumnName(1) == "key")
assert(metadata.getColumnName(2) == "value")
assert(resultSet.next())
assert(resultSet.getString(1) == "table.dynamic-table-options.enabled")
assert(resultSet.getString(2) == "true")
})
}
test("execute statement - show properties") {
withMultipleConnectionJdbcStatement()({ statement =>
val resultSet = statement.executeQuery("set")
val metadata = resultSet.getMetaData
assert(metadata.getColumnName(1) == "key")
assert(metadata.getColumnName(2) == "value")
assert(resultSet.next())
})
}
test("execute statement - reset property") {
withMultipleConnectionJdbcStatement()({ statement =>
statement.executeQuery("set pipeline.jars = my.jar")
statement.executeQuery("reset pipeline.jars")
val resultSet = statement.executeQuery("set")
// Flink does not support set key without value currently,
// thus read all rows to find the desired one
var success = false
while (resultSet.next()) {
if (resultSet.getString(1) == "pipeline.jars" && resultSet.getString(2) == "") {
success = true
}
}
assert(success)
})
}
}