diff --git a/externals/kyuubi-flink-sql-engine/pom.xml b/externals/kyuubi-flink-sql-engine/pom.xml
index eec5c1cd9..1e12b82d0 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -105,6 +105,12 @@
provided
+
+ org.apache.flink
+ flink-table-planner-loader
+ provided
+
+
org.apache.kyuubi
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 0e0c476e2..f30b6ab86 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -17,10 +17,15 @@
package org.apache.kyuubi.engine.flink.operation
+import java.util.Optional
+
import scala.concurrent.duration.Duration
import org.apache.flink.api.common.JobID
+import org.apache.flink.table.api.TableException
import org.apache.flink.table.gateway.api.operation.OperationHandle
+import org.apache.flink.table.operations.Operation
+import org.apache.flink.table.operations.command.HelpOperation
import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
@@ -28,6 +33,7 @@ import org.apache.kyuubi.engine.flink.result.ResultSetUtil
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.reflect.{DynConstructors, DynFields, DynMethods}
class ExecuteStatement(
session: Session,
@@ -59,6 +65,14 @@ class ExecuteStatement(
private def executeStatement(): Unit = {
try {
setState(OperationState.RUNNING)
+
+ val operation = parseExtendedStatement(statement)
+ if (operation.isPresent && operation.get().isInstanceOf[HelpOperation]) {
+ resultSet = ResultSetUtil.helpMessageResultSet
+ setState(OperationState.FINISHED)
+ return
+ }
+
val resultFetcher = executor.executeStatement(
new OperationHandle(getHandle.identifier),
statement)
@@ -71,4 +85,36 @@ class ExecuteStatement(
shutdownTimeoutMonitor()
}
}
+
+ private def parseExtendedStatement(statement: String): Optional[Operation] = {
+ val plannerModuleClassLoader: ClassLoader = getPlannerModuleClassLoader
+ val extendedParser: AnyRef =
+ DynConstructors.builder()
+ .loader(plannerModuleClassLoader)
+ .impl("org.apache.flink.table.planner.parse.ExtendedParser")
+ .build().newInstance()
+ DynMethods.builder("parse")
+ .hiddenImpl(extendedParser.getClass, classOf[String])
+ .buildChecked()
+ .invoke(extendedParser, statement)
+ }
+
+ private def getPlannerModuleClassLoader: ClassLoader = {
+ try {
+ val plannerModule = DynMethods.builder("getInstance")
+ .hiddenImpl("org.apache.flink.table.planner.loader.PlannerModule")
+ .buildStaticChecked()
+ .invoke().asInstanceOf[AnyRef]
+
+ DynFields.builder()
+ .hiddenImpl(plannerModule.getClass, "submoduleClassLoader")
+ .build[ClassLoader].bind(plannerModule).get
+ } catch {
+ case e: Exception =>
+ throw new TableException(
+ "Error obtaining Flink planner module ClassLoader. " +
+ "Make sure a flink-table-planner-loader.jar is on the classpath",
+ e)
+ }
+ }
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/CommandStrings.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/CommandStrings.scala
new file mode 100644
index 000000000..56a199fa6
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/CommandStrings.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.flink.util.Preconditions
+import org.jline.utils.{AttributedString, AttributedStringBuilder, AttributedStyle}
+
+/**
+ * Utility class that contains all strings for Flink SQL commands and messages.
+ */
+object CommandStrings {
+ private val CMD_DESC_DELIMITER = "\t\t"
+
+ private class SQLCommandsDescriptions {
+ private var commandMaxLength = -1
+ private val commandsDescriptionList = ListBuffer[(String, String)]()
+
+ def commandDescription(command: String, description: String): SQLCommandsDescriptions = {
+ Preconditions.checkState(
+ command.nonEmpty,
+ s"content of command must not be empty.",
+ Seq(): _*)
+ Preconditions.checkState(
+ description.nonEmpty,
+ s"content of command's description must not be empty.",
+ Seq(): _*)
+
+ updateMaxCommandLength(command.length)
+ commandsDescriptionList += ((command, description))
+ this
+ }
+
+ private def updateMaxCommandLength(newLength: Int): Unit = {
+ Preconditions.checkState(newLength > 0)
+ if (commandMaxLength < newLength) {
+ commandMaxLength = newLength
+ }
+ }
+
+ private def formatDescription(input: String): String = {
+ val maxLineLength = 160
+ val newLinePrefix = " " * commandMaxLength + CMD_DESC_DELIMITER
+ val words = input.split(" ")
+
+ val (lastLine, lines) = words.foldLeft(("", List[String]())) {
+ case ((line, lines), word) =>
+ val newLine = if (line.isEmpty) word else line + " " + word
+ if (newLine.length > maxLineLength) (word, lines :+ line) else (newLine, lines)
+ }
+
+ (lines :+ lastLine).mkString("\n" + newLinePrefix)
+ }
+
+ def build(): AttributedString = {
+ val attributedStringBuilder = new AttributedStringBuilder
+ if (commandsDescriptionList.nonEmpty) {
+ commandsDescriptionList.foreach {
+ case (cmd, cmdDesc) =>
+ attributedStringBuilder
+ .style(AttributedStyle.DEFAULT.bold())
+ .append(cmd.padTo(commandMaxLength, " ").mkString)
+ .append(CMD_DESC_DELIMITER)
+ .style(AttributedStyle.DEFAULT)
+ .append(formatDescription(cmdDesc))
+ .append('\n')
+ }
+ }
+ attributedStringBuilder.toAttributedString
+ }
+ }
+
+ // scalastyle:off line.size.limit
+ val MESSAGE_HELP: AttributedString =
+ new AttributedStringBuilder()
+ .append("The following commands are available:\n\n")
+ .append(COMMANDS_DESCRIPTIONS)
+ .style(AttributedStyle.DEFAULT.underline())
+ .append("\nHint")
+ .style(AttributedStyle.DEFAULT)
+ .append(
+ ": Make sure that a statement ends with \";\" for finalizing (multi-line) statements.")
+ // About Documentation Link.
+ .style(AttributedStyle.DEFAULT)
+ .append(
+ "\nThe above list includes only the most frequently used statements.\nYou can also type any Flink SQL statement, please visit https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/ for more details.")
+ .toAttributedString
+
+ def COMMANDS_DESCRIPTIONS: AttributedString =
+ new SQLCommandsDescriptions()
+ .commandDescription(
+ "HELP",
+ "Prints the available commands or the detailed description of a specified command.")
+ .commandDescription(
+ "SET",
+ "Sets a session configuration property. Syntax: \"SET ''='';\". Use \"SET;\" for listing all properties.")
+ .commandDescription(
+ "RESET",
+ "Resets a session configuration property. Syntax: \"RESET '';\". Use \"RESET;\" for reset all session properties.")
+ .commandDescription(
+ "INSERT INTO",
+ "Inserts the results of a SQL SELECT query into a declared table sink.")
+ .commandDescription(
+ "INSERT OVERWRITE",
+ "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")
+ .commandDescription(
+ "SELECT",
+ "Executes a SQL SELECT query on the Flink cluster.")
+ .commandDescription(
+ "EXPLAIN",
+ "Describes the execution plan of a query or table with the given name.")
+ .commandDescription(
+ "BEGIN STATEMENT SET",
+ "Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"")
+ .commandDescription("END", "Ends a statement set. Syntax: \"END;\"")
+ .commandDescription(
+ "ADD JAR",
+ "Adds the specified jar file to the submitted jobs' classloader. Syntax: \"ADD JAR '.jar'\"")
+ .commandDescription(
+ "SHOW JARS",
+ "Shows the list of user-specified jar dependencies. This list is impacted by the ADD JAR commands.")
+ .commandDescription(
+ "SHOW CATALOGS",
+ "Shows the list of registered catalogs.")
+ .commandDescription(
+ "SHOW CURRENT CATALOG",
+ "Shows the name of the current catalog.")
+ .commandDescription(
+ "SHOW DATABASES",
+ "Shows all databases in the current catalog.")
+ .commandDescription(
+ "SHOW CURRENT DATABASE",
+ "Shows the name of the current database.")
+ .commandDescription(
+ "SHOW TABLES",
+ "Shows all tables for an optionally specified database. Syntax: \"SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE ]\"")
+ .commandDescription(
+ "SHOW CREATE TABLE",
+ "Shows the CREATE TABLE statement that creates the specified table.")
+ .commandDescription(
+ "SHOW COLUMNS",
+ "Shows all columns of a table with the given name. Syntax: \"SHOW COLUMNS ( FROM | IN ) [[catalog_name.]database.] [ [NOT] LIKE ]\"")
+ .commandDescription(
+ "SHOW VIEWS",
+ "Shows all views in the current catalog and the current database.")
+ .commandDescription(
+ "SHOW CREATE VIEW",
+ "Shows the CREATE VIEW statement that creates the specified view. Syntax: \"SHOW CREATE VIEW [catalog_name.][db_name.]view_name\"")
+ .commandDescription(
+ "SHOW FUNCTIONS",
+ "Shows all user-defined and built-in functions in the current catalog and current database. Use \"SHOW USER FUNCTIONS\" for listing all user-defined functions in the current catalog and current database.")
+ .commandDescription(
+ "SHOW MODULES",
+ "Shows all enabled module names with resolution order.")
+ .commandDescription(
+ "USE CATALOG",
+ "Sets the current catalog. All subsequent commands that do not explicitly specify a catalog will use this one. If the provided catalog does not exist, an exception is thrown. The default current catalog is default_catalog. Syntax: \"USE CATALOG catalog_name\"")
+ .commandDescription(
+ "USE",
+ "Sets the current database. All subsequent commands that do not explicitly specify a database will use this one. If the provided database does not exist, an exception is thrown. The default current database is default_database. Syntax: \"USE [catalog_name.]database_name\"")
+ .commandDescription(
+ "DESC",
+ "Describes the schema of a table with the given name. Syntax: \"{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name\"")
+ .commandDescription(
+ "ANALYZE",
+ "ANALYZE statements are used to collect statistics for existing tables and store the result to catalog. Only supports in batch mode. Syntax: \"ANALYZE TABLE [catalog_name.][db_name.]table_name PARTITION(partcol1[=val1] [, partcol2[=val2], ...]) COMPUTE STATISTICS [FOR COLUMNS col1 [, col2, ...] | FOR ALL COLUMNS]\"")
+ .commandDescription(
+ "ALTER TABLE",
+ "Renames a table or change a table's properties. Syntax: \"ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name\", the other syntax: \"ALTER TABLE [catalog_name.][db_name.]table_name SET ( key1=val1[, key2=val2, ...] )\"")
+ .commandDescription(
+ "ALTER VIEW",
+ "Renames a given view to a new name within the same catalog and database. Syntax: \"ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO new_view_name\"")
+ .commandDescription(
+ "ALTER DATABASE",
+ "Changes a database's properties. Syntax: \"ALTER DATABASE [catalog_name.]db_name SET ( key1=val1[, key2=val2, ...] )\"")
+ .commandDescription(
+ "ALTER FUNCTION",
+ "Changes a catalog function with the new identifier and optional language tag. Syntax: \"ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON]\"")
+ .commandDescription(
+ "DROP CATALOG",
+ "Drops a catalog with the given catalog name. Syntax: \"DROP CATALOG [IF EXISTS] catalog_name\"")
+ .commandDescription(
+ "DROP DATABASE",
+ "Drops a database with the given database name. Syntax: \"DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]\"")
+ .commandDescription(
+ "DROP TABLE",
+ "Drops a table with the given table name. Syntax: \"DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name\"")
+ .commandDescription(
+ "DROP VIEW",
+ "Drops a view with the given view name. Syntax: \"DROP [TEMPORARY] VIEW [IF EXISTS] [catalog_name.][db_name.]view_name\"")
+ .commandDescription(
+ "DROP FUNCTION",
+ "Drops a catalog function with the given function name. Syntax: \"DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name\"")
+ .commandDescription(
+ "CREATE CATALOG",
+ "Creates a catalog with the given catalog properties. Syntax: \"CREATE CATALOG catalog_name WITH ( 'key1'='value1'[, 'key2'='value2', ...] )\"")
+ .commandDescription(
+ "CREATE DATABASE",
+ "Creates a database with the given database properties. Syntax: \"CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name [COMMENT 'database_comment'] [WITH ( 'key1'='value1'[, 'key2'='value2', ...] )]\"")
+ .commandDescription(
+ "CREATE TABLE",
+ "Creates a table with the given table properties. Syntax: \"CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( { col_name data_type [COMMENT col_comment] [column_constraint] | table_constraint } [,...] ) [COMMENT table_comment] [PARTITIONED BY (col_name, col_name, ...)] [WITH ( 'key1'='value1'[, 'key2'='value2', ...] )] \"")
+ .commandDescription(
+ "CREATE VIEW",
+ "Creates a view with the given view expression. Syntax: \"CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name [(column_name [,...])] [COMMENT view_comment] AS query_expression\"")
+ .commandDescription(
+ "CREATE FUNCTION",
+ "Creates a catalog function with the given function properties. Syntax: \"CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.][db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON] [USING JAR '.jar' [, JAR '.jar']* ]\"")
+ .commandDescription(
+ "SHOW JOBS",
+ "Show the jobs in the Flink cluster. Supports in version 1.17 and later.")
+ .commandDescription(
+ "STOP JOB",
+ "Stop the job with the given job ID. Supports in version 1.17 and later. Syntax: \"STOP JOB '' [WITH SAVEPOINT] [WITH DRAIN]\"")
+ .commandDescription(
+ "UPDATE",
+ "Performs row-level updating on the target table. Only supports in batch mode. Supports in version 1.17 and later. Syntax: \"UPDATE [catalog_name.][db_name.]table_name SET col_name1 = col_val1 [, col_name2 = col_val2 ...] [WHERE condition]\"")
+ .commandDescription(
+ "DELETE",
+ "Performs row-level deleting on the target table. Only supports in batch mode. Supports in version 1.17 and later. Syntax: \"DELETE FROM [catalog_name.][db_name.]table_name [WHERE condition]\"")
+ .commandDescription(
+ "TRUNCATE TABLE",
+ "Truncates the target table. Only supports in batch mode. Supports in version 1.18 and later. Syntax: \"TRUNCATE TABLE [catalog_name.][db_name.]table_name\"")
+ .commandDescription(
+ "CALL",
+ "Calls a stored procedure. Supports in version 1.18 and later. Syntax: \"CALL [catalog_name.][database_name.]procedure_name ([ expression [, expression]* ] )\"")
+ .build()
+ // scalastyle:on
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
index 8b722f1e5..583c64fc6 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
@@ -58,6 +58,13 @@ object ResultSetUtil {
.data(Array[Row](Row.of("OK")))
.build
+ def helpMessageResultSet: ResultSet =
+ ResultSet.builder
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .columns(Column.physical("result", DataTypes.STRING))
+ .data(Array[Row](Row.of(CommandStrings.MESSAGE_HELP.toString)))
+ .build
+
def fromResultFetcher(
resultFetcher: ResultFetcher,
maxRows: Int,
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 9469cf286..76f718976 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -32,7 +32,7 @@ import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
import org.apache.kyuubi.engine.flink.WithFlinkTestResources
-import org.apache.kyuubi.engine.flink.result.Constants
+import org.apache.kyuubi.engine.flink.result.{CommandStrings, Constants}
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
import org.apache.kyuubi.jdbc.hive.{KyuubiSQLException, KyuubiStatement}
import org.apache.kyuubi.jdbc.hive.common.TimestampTZ
@@ -1265,4 +1265,14 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
})
assert(exception.getMessage === "Futures timed out after [60000 milliseconds]")
}
+
+ test("execute statement - help") {
+ withJdbcStatement() { stmt =>
+ val resultSet = stmt.executeQuery("help")
+ val metadata = resultSet.getMetaData
+ assert(metadata.getColumnName(1) === "result")
+ assert(resultSet.next())
+ assert(resultSet.getString(1).equals(CommandStrings.MESSAGE_HELP.toString))
+ }
+ }
}
diff --git a/pom.xml b/pom.xml
index 75bfd8650..815d5ed22 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1385,6 +1385,12 @@
provided
+
+ org.apache.flink
+ flink-table-planner-loader
+ ${flink.version}
+
+
org.apache.flink
flink-sql-client