[KYUUBI #5505][FLINK] Support HELP command

### _Why are the changes needed?_

resolve: #5505

Show available commands when users type 'HELP;' in the beeline.

#### Solution:

- Using `ExtendedParser` parse statement and return Operation of Flink engine.
- Check whether the operation is HelpOperation or not.
- dependency on `flink-table-planner-loader.jar`.

#### **Why not using Flink SQL Client Parser(SqlCommandParserImpl) to obtain the Command enum?**
Flink 1.16's approach:
```
val opt:Optional[Operation] = org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand()
check opt.get() instance of HelpOperation or not
if yes return CliStrings.MESSAGE_HELP
```

Flink 1.17 & 1.18
```
val opt: Optional[Command] = org.apache.flink.table.client.cli.parser.SqlCommandParserImpl.parseStatement()
check opt.get() is Command.HELP or not
if yes return CliStrings.MESSAGE_HELP
```

The `Command` class is added from Flink1.17;
The `SqlCommandParserImpl` package is changed, and the method name is changed from Flink1.17;

This approach requires distinguishing between different Flink versions and maintaining both implementations.
It's more complicated, so abandoned.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

Closes #5585 from YesOrNo828/help.

Closes #5505

e73b15e43 [Xianxun Ye] [KYUUBI #5505] [FLINK] Add job,update,delete,truncate and call statements
5943dd072 [Xianxun Ye] [KYUUBI #5505] [FLINK] Add help messages
fdc2db6ab [Xianxun Ye] [KYUUBI #5505] [FLINK] Support HELP command, fixed Pan's comments
a728048fc [Xianxun Ye] [KYUUBI #5505] [FLINK] Support HELP command, fixed Pan's comments
6323cfb85 [Xianxun Ye] [KYUUBI #5505] [FLINK] Support HELP command

Authored-by: Xianxun Ye <yesorno828423@gmail.com>
Signed-off-by: Paul Lin <paullin3280@gmail.com>
This commit is contained in:
Xianxun Ye 2023-11-07 15:09:57 +08:00 committed by Paul Lin
parent 04cae6ad36
commit d4320e7a6b
6 changed files with 321 additions and 1 deletions

View File

@ -105,6 +105,12 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<scope>provided</scope>
</dependency>
<!-- tests -->
<dependency>
<groupId>org.apache.kyuubi</groupId>

View File

@ -17,10 +17,15 @@
package org.apache.kyuubi.engine.flink.operation
import java.util.Optional
import scala.concurrent.duration.Duration
import org.apache.flink.api.common.JobID
import org.apache.flink.table.api.TableException
import org.apache.flink.table.gateway.api.operation.OperationHandle
import org.apache.flink.table.operations.Operation
import org.apache.flink.table.operations.command.HelpOperation
import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
@ -28,6 +33,7 @@ import org.apache.kyuubi.engine.flink.result.ResultSetUtil
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.reflect.{DynConstructors, DynFields, DynMethods}
class ExecuteStatement(
session: Session,
@ -59,6 +65,14 @@ class ExecuteStatement(
private def executeStatement(): Unit = {
try {
setState(OperationState.RUNNING)
val operation = parseExtendedStatement(statement)
if (operation.isPresent && operation.get().isInstanceOf[HelpOperation]) {
resultSet = ResultSetUtil.helpMessageResultSet
setState(OperationState.FINISHED)
return
}
val resultFetcher = executor.executeStatement(
new OperationHandle(getHandle.identifier),
statement)
@ -71,4 +85,36 @@ class ExecuteStatement(
shutdownTimeoutMonitor()
}
}
private def parseExtendedStatement(statement: String): Optional[Operation] = {
val plannerModuleClassLoader: ClassLoader = getPlannerModuleClassLoader
val extendedParser: AnyRef =
DynConstructors.builder()
.loader(plannerModuleClassLoader)
.impl("org.apache.flink.table.planner.parse.ExtendedParser")
.build().newInstance()
DynMethods.builder("parse")
.hiddenImpl(extendedParser.getClass, classOf[String])
.buildChecked()
.invoke(extendedParser, statement)
}
private def getPlannerModuleClassLoader: ClassLoader = {
try {
val plannerModule = DynMethods.builder("getInstance")
.hiddenImpl("org.apache.flink.table.planner.loader.PlannerModule")
.buildStaticChecked()
.invoke().asInstanceOf[AnyRef]
DynFields.builder()
.hiddenImpl(plannerModule.getClass, "submoduleClassLoader")
.build[ClassLoader].bind(plannerModule).get
} catch {
case e: Exception =>
throw new TableException(
"Error obtaining Flink planner module ClassLoader. " +
"Make sure a flink-table-planner-loader.jar is on the classpath",
e)
}
}
}

View File

@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.result
import scala.collection.mutable.ListBuffer
import org.apache.flink.util.Preconditions
import org.jline.utils.{AttributedString, AttributedStringBuilder, AttributedStyle}
/**
* Utility class that contains all strings for Flink SQL commands and messages.
*/
object CommandStrings {
private val CMD_DESC_DELIMITER = "\t\t"
private class SQLCommandsDescriptions {
private var commandMaxLength = -1
private val commandsDescriptionList = ListBuffer[(String, String)]()
def commandDescription(command: String, description: String): SQLCommandsDescriptions = {
Preconditions.checkState(
command.nonEmpty,
s"content of command must not be empty.",
Seq(): _*)
Preconditions.checkState(
description.nonEmpty,
s"content of command's description must not be empty.",
Seq(): _*)
updateMaxCommandLength(command.length)
commandsDescriptionList += ((command, description))
this
}
private def updateMaxCommandLength(newLength: Int): Unit = {
Preconditions.checkState(newLength > 0)
if (commandMaxLength < newLength) {
commandMaxLength = newLength
}
}
private def formatDescription(input: String): String = {
val maxLineLength = 160
val newLinePrefix = " " * commandMaxLength + CMD_DESC_DELIMITER
val words = input.split(" ")
val (lastLine, lines) = words.foldLeft(("", List[String]())) {
case ((line, lines), word) =>
val newLine = if (line.isEmpty) word else line + " " + word
if (newLine.length > maxLineLength) (word, lines :+ line) else (newLine, lines)
}
(lines :+ lastLine).mkString("\n" + newLinePrefix)
}
def build(): AttributedString = {
val attributedStringBuilder = new AttributedStringBuilder
if (commandsDescriptionList.nonEmpty) {
commandsDescriptionList.foreach {
case (cmd, cmdDesc) =>
attributedStringBuilder
.style(AttributedStyle.DEFAULT.bold())
.append(cmd.padTo(commandMaxLength, " ").mkString)
.append(CMD_DESC_DELIMITER)
.style(AttributedStyle.DEFAULT)
.append(formatDescription(cmdDesc))
.append('\n')
}
}
attributedStringBuilder.toAttributedString
}
}
// scalastyle:off line.size.limit
val MESSAGE_HELP: AttributedString =
new AttributedStringBuilder()
.append("The following commands are available:\n\n")
.append(COMMANDS_DESCRIPTIONS)
.style(AttributedStyle.DEFAULT.underline())
.append("\nHint")
.style(AttributedStyle.DEFAULT)
.append(
": Make sure that a statement ends with \";\" for finalizing (multi-line) statements.")
// About Documentation Link.
.style(AttributedStyle.DEFAULT)
.append(
"\nThe above list includes only the most frequently used statements.\nYou can also type any Flink SQL statement, please visit https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/ for more details.")
.toAttributedString
def COMMANDS_DESCRIPTIONS: AttributedString =
new SQLCommandsDescriptions()
.commandDescription(
"HELP",
"Prints the available commands or the detailed description of a specified command.")
.commandDescription(
"SET",
"Sets a session configuration property. Syntax: \"SET '<key>'='<value>';\". Use \"SET;\" for listing all properties.")
.commandDescription(
"RESET",
"Resets a session configuration property. Syntax: \"RESET '<key>';\". Use \"RESET;\" for reset all session properties.")
.commandDescription(
"INSERT INTO",
"Inserts the results of a SQL SELECT query into a declared table sink.")
.commandDescription(
"INSERT OVERWRITE",
"Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")
.commandDescription(
"SELECT",
"Executes a SQL SELECT query on the Flink cluster.")
.commandDescription(
"EXPLAIN",
"Describes the execution plan of a query or table with the given name.")
.commandDescription(
"BEGIN STATEMENT SET",
"Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"")
.commandDescription("END", "Ends a statement set. Syntax: \"END;\"")
.commandDescription(
"ADD JAR",
"Adds the specified jar file to the submitted jobs' classloader. Syntax: \"ADD JAR '<path_to_filename>.jar'\"")
.commandDescription(
"SHOW JARS",
"Shows the list of user-specified jar dependencies. This list is impacted by the ADD JAR commands.")
.commandDescription(
"SHOW CATALOGS",
"Shows the list of registered catalogs.")
.commandDescription(
"SHOW CURRENT CATALOG",
"Shows the name of the current catalog.")
.commandDescription(
"SHOW DATABASES",
"Shows all databases in the current catalog.")
.commandDescription(
"SHOW CURRENT DATABASE",
"Shows the name of the current database.")
.commandDescription(
"SHOW TABLES",
"Shows all tables for an optionally specified database. Syntax: \"SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]\"")
.commandDescription(
"SHOW CREATE TABLE",
"Shows the CREATE TABLE statement that creates the specified table.")
.commandDescription(
"SHOW COLUMNS",
"Shows all columns of a table with the given name. Syntax: \"SHOW COLUMNS ( FROM | IN ) [[catalog_name.]database.]<table_name> [ [NOT] LIKE <sql_like_pattern>]\"")
.commandDescription(
"SHOW VIEWS",
"Shows all views in the current catalog and the current database.")
.commandDescription(
"SHOW CREATE VIEW",
"Shows the CREATE VIEW statement that creates the specified view. Syntax: \"SHOW CREATE VIEW [catalog_name.][db_name.]view_name\"")
.commandDescription(
"SHOW FUNCTIONS",
"Shows all user-defined and built-in functions in the current catalog and current database. Use \"SHOW USER FUNCTIONS\" for listing all user-defined functions in the current catalog and current database.")
.commandDescription(
"SHOW MODULES",
"Shows all enabled module names with resolution order.")
.commandDescription(
"USE CATALOG",
"Sets the current catalog. All subsequent commands that do not explicitly specify a catalog will use this one. If the provided catalog does not exist, an exception is thrown. The default current catalog is default_catalog. Syntax: \"USE CATALOG catalog_name\"")
.commandDescription(
"USE",
"Sets the current database. All subsequent commands that do not explicitly specify a database will use this one. If the provided database does not exist, an exception is thrown. The default current database is default_database. Syntax: \"USE [catalog_name.]database_name\"")
.commandDescription(
"DESC",
"Describes the schema of a table with the given name. Syntax: \"{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name\"")
.commandDescription(
"ANALYZE",
"ANALYZE statements are used to collect statistics for existing tables and store the result to catalog. Only supports in batch mode. Syntax: \"ANALYZE TABLE [catalog_name.][db_name.]table_name PARTITION(partcol1[=val1] [, partcol2[=val2], ...]) COMPUTE STATISTICS [FOR COLUMNS col1 [, col2, ...] | FOR ALL COLUMNS]\"")
.commandDescription(
"ALTER TABLE",
"Renames a table or change a table's properties. Syntax: \"ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name\", the other syntax: \"ALTER TABLE [catalog_name.][db_name.]table_name SET ( key1=val1[, key2=val2, ...] )\"")
.commandDescription(
"ALTER VIEW",
"Renames a given view to a new name within the same catalog and database. Syntax: \"ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO new_view_name\"")
.commandDescription(
"ALTER DATABASE",
"Changes a database's properties. Syntax: \"ALTER DATABASE [catalog_name.]db_name SET ( key1=val1[, key2=val2, ...] )\"")
.commandDescription(
"ALTER FUNCTION",
"Changes a catalog function with the new identifier and optional language tag. Syntax: \"ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON]\"")
.commandDescription(
"DROP CATALOG",
"Drops a catalog with the given catalog name. Syntax: \"DROP CATALOG [IF EXISTS] catalog_name\"")
.commandDescription(
"DROP DATABASE",
"Drops a database with the given database name. Syntax: \"DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]\"")
.commandDescription(
"DROP TABLE",
"Drops a table with the given table name. Syntax: \"DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name\"")
.commandDescription(
"DROP VIEW",
"Drops a view with the given view name. Syntax: \"DROP [TEMPORARY] VIEW [IF EXISTS] [catalog_name.][db_name.]view_name\"")
.commandDescription(
"DROP FUNCTION",
"Drops a catalog function with the given function name. Syntax: \"DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name\"")
.commandDescription(
"CREATE CATALOG",
"Creates a catalog with the given catalog properties. Syntax: \"CREATE CATALOG catalog_name WITH ( 'key1'='value1'[, 'key2'='value2', ...] )\"")
.commandDescription(
"CREATE DATABASE",
"Creates a database with the given database properties. Syntax: \"CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name [COMMENT 'database_comment'] [WITH ( 'key1'='value1'[, 'key2'='value2', ...] )]\"")
.commandDescription(
"CREATE TABLE",
"Creates a table with the given table properties. Syntax: \"CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( { col_name data_type [COMMENT col_comment] [column_constraint] | table_constraint } [,...] ) [COMMENT table_comment] [PARTITIONED BY (col_name, col_name, ...)] [WITH ( 'key1'='value1'[, 'key2'='value2', ...] )] \"")
.commandDescription(
"CREATE VIEW",
"Creates a view with the given view expression. Syntax: \"CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name [(column_name [,...])] [COMMENT view_comment] AS query_expression\"")
.commandDescription(
"CREATE FUNCTION",
"Creates a catalog function with the given function properties. Syntax: \"CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.][db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON] [USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]\"")
.commandDescription(
"SHOW JOBS",
"Show the jobs in the Flink cluster. Supports in version 1.17 and later.")
.commandDescription(
"STOP JOB",
"Stop the job with the given job ID. Supports in version 1.17 and later. Syntax: \"STOP JOB '<job_id>' [WITH SAVEPOINT] [WITH DRAIN]\"")
.commandDescription(
"UPDATE",
"Performs row-level updating on the target table. Only supports in batch mode. Supports in version 1.17 and later. Syntax: \"UPDATE [catalog_name.][db_name.]table_name SET col_name1 = col_val1 [, col_name2 = col_val2 ...] [WHERE condition]\"")
.commandDescription(
"DELETE",
"Performs row-level deleting on the target table. Only supports in batch mode. Supports in version 1.17 and later. Syntax: \"DELETE FROM [catalog_name.][db_name.]table_name [WHERE condition]\"")
.commandDescription(
"TRUNCATE TABLE",
"Truncates the target table. Only supports in batch mode. Supports in version 1.18 and later. Syntax: \"TRUNCATE TABLE [catalog_name.][db_name.]table_name\"")
.commandDescription(
"CALL",
"Calls a stored procedure. Supports in version 1.18 and later. Syntax: \"CALL [catalog_name.][database_name.]procedure_name ([ expression [, expression]* ] )\"")
.build()
// scalastyle:on
}

View File

@ -58,6 +58,13 @@ object ResultSetUtil {
.data(Array[Row](Row.of("OK")))
.build
def helpMessageResultSet: ResultSet =
ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(Column.physical("result", DataTypes.STRING))
.data(Array[Row](Row.of(CommandStrings.MESSAGE_HELP.toString)))
.build
def fromResultFetcher(
resultFetcher: ResultFetcher,
maxRows: Int,

View File

@ -32,7 +32,7 @@ import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
import org.apache.kyuubi.engine.flink.WithFlinkTestResources
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.engine.flink.result.{CommandStrings, Constants}
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
import org.apache.kyuubi.jdbc.hive.{KyuubiSQLException, KyuubiStatement}
import org.apache.kyuubi.jdbc.hive.common.TimestampTZ
@ -1265,4 +1265,14 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
})
assert(exception.getMessage === "Futures timed out after [60000 milliseconds]")
}
test("execute statement - help") {
withJdbcStatement() { stmt =>
val resultSet = stmt.executeQuery("help")
val metadata = resultSet.getMetaData
assert(metadata.getColumnName(1) === "result")
assert(resultSet.next())
assert(resultSet.getString(1).equals(CommandStrings.MESSAGE_HELP.toString))
}
}
}

View File

@ -1385,6 +1385,12 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client</artifactId>