[KYUUBI #1681] Extend Flink ExecuteStatement to support more operations

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

ExecuteStatement of Flink engine now supports QueryOperation only. We can extend it to support more operations that Flink Executor already supports. e.g. ShowTableOperation,CreateTableOperation, ExplainOperation.

This is a sub-task of KPIP-2 #1322 .

### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [X] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1693 from link3280/feature/extend_flink_operation.

Closes #1681

520d1f42 [Paul Lin] [KYUUBI #1681] Extend Flink ExecuteStatement to support more operations

Authored-by: Paul Lin <paullin3280@gmail.com>
Signed-off-by: yanghua <yanghua1127@gmail.com>
This commit is contained in:
Paul Lin 2022-01-10 10:09:19 +08:00 committed by yanghua
parent 6f4427b1ca
commit e2fb7d6f9b
7 changed files with 169 additions and 165 deletions

View File

@ -1,81 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.result;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import org.apache.flink.util.Preconditions;
/** A column info represents a table column's structure with column name, column type. */
public class ColumnInfo {
private String name;
private String type;
@Nullable private LogicalType logicalType;
public ColumnInfo(String name, String type) {
this.name = Preconditions.checkNotNull(name, "name must not be null");
this.type = Preconditions.checkNotNull(type, "type must not be null");
}
public static ColumnInfo create(String name, LogicalType type) {
return new ColumnInfo(name, type.toString());
}
public String getName() {
return name;
}
public String getType() {
return type;
}
public LogicalType getLogicalType() {
if (logicalType == null) {
logicalType = LogicalTypeParser.parse(type);
}
return logicalType;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ColumnInfo that = (ColumnInfo) o;
return name.equals(that.name) && type.equals(that.type);
}
@Override
public int hashCode() {
return Objects.hash(name, type);
}
@Override
public String toString() {
return "ColumnInfo{" + "name='" + name + '\'' + ", type='" + type + '\'' + '}';
}
}

View File

@ -20,6 +20,10 @@ package org.apache.kyuubi.engine.flink.result;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
@ -40,9 +44,14 @@ public class OperationUtil {
}
}
DataType dataType = DataTypes.VARCHAR(maxLength);
if (!isNullable) {
dataType.notNull();
}
return ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(ColumnInfo.create(columnName, new VarCharType(isNullable, maxLength)))
.columns(Column.physical(columnName, dataType))
.data(data.toArray(new Row[0]))
.build();
}

View File

@ -1,28 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.result;
/** ResultKind defines the types of the result. */
public enum ResultKind {
// for DDL, DCL and statements with a simple "OK"
SUCCESS,
// rows with important content are available (DML, DQL)
SUCCESS_WITH_CONTENT
}

View File

@ -19,24 +19,29 @@
package org.apache.kyuubi.engine.flink.result;
import com.google.common.collect.Iterators;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.kyuubi.operation.ArrayFetchIterator;
import org.apache.kyuubi.operation.FetchIterator;
/**
* A set of one statement execution result containing result kind, column infos, rows of data and
* change flags for streaming mode.
* A set of one statement execution result containing result kind, columns, rows of data and change
* flags for streaming mode.
*/
public class ResultSet {
private final ResultKind resultKind;
private final List<ColumnInfo> columns;
private final List<Column> columns;
private final FetchIterator<Row> data;
// null in batch mode
@ -47,7 +52,7 @@ public class ResultSet {
private ResultSet(
ResultKind resultKind,
List<ColumnInfo> columns,
List<Column> columns,
FetchIterator<Row> data,
@Nullable List<Boolean> changeFlags) {
this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind must not be null");
@ -61,7 +66,7 @@ public class ResultSet {
}
}
public List<ColumnInfo> getColumns() {
public List<Column> getColumns() {
return columns;
}
@ -103,6 +108,19 @@ public class ResultSet {
+ '}';
}
public static ResultSet fromTableResult(TableResult tableResult) {
ResolvedSchema schema = tableResult.getResolvedSchema();
// collect all rows from table result as list
// this is ok as TableResult contains limited rows
List<Row> rows = new ArrayList<>();
tableResult.collect().forEachRemaining(rows::add);
return builder()
.resultKind(tableResult.getResultKind())
.columns(schema.getColumns())
.data(rows.toArray(new Row[0]))
.build();
}
public static Builder builder() {
return new Builder();
}
@ -110,7 +128,7 @@ public class ResultSet {
/** Builder for {@link ResultSet}. */
public static class Builder {
private ResultKind resultKind = null;
private List<ColumnInfo> columns = null;
private List<Column> columns = null;
private FetchIterator<Row> data = null;
private List<Boolean> changeFlags = null;
@ -122,14 +140,14 @@ public class ResultSet {
return this;
}
/** Set {@link ColumnInfo}s. */
public Builder columns(ColumnInfo... columns) {
/** Set columns. */
public Builder columns(Column... columns) {
this.columns = Arrays.asList(columns);
return this;
}
/** Set {@link ColumnInfo}s. */
public Builder columns(List<ColumnInfo> columns) {
/** Set columns. */
public Builder columns(List<Column> columns) {
this.columns = columns;
return this;
}

View File

@ -17,19 +17,19 @@
package org.apache.kyuubi.engine.flink.operation
import java.util
import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import com.google.common.annotations.VisibleForTesting
import org.apache.flink.table.client.gateway.{Executor, ResultDescriptor, TypedResult}
import org.apache.flink.table.operations.QueryOperation
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.client.gateway.{Executor, TypedResult}
import org.apache.flink.table.operations.{Operation, QueryOperation}
import org.apache.flink.types.Row
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.flink.result.{ColumnInfo, ResultKind, ResultSet}
import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@ -45,10 +45,6 @@ class ExecuteStatement(
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
private var resultDescriptor: ResultDescriptor = _
private var columnInfos: util.List[ColumnInfo] = _
private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@ -101,39 +97,11 @@ class ExecuteStatement(
try {
setState(OperationState.RUNNING)
columnInfos = new util.ArrayList[ColumnInfo]
val operation = executor.parseStatement(sessionId, statement)
resultDescriptor = executor.executeQuery(sessionId, operation.asInstanceOf[QueryOperation])
resultDescriptor.getResultSchema.getColumns.asScala.foreach { column =>
columnInfos.add(ColumnInfo.create(column.getName, column.getDataType.getLogicalType))
operation match {
case queryOperation: QueryOperation => runQueryOperation(queryOperation)
case operation: Operation => runOperation(operation)
}
val resultID = resultDescriptor.getResultId
val rows = new ArrayBuffer[Row]()
var loop = true
while (loop) {
Thread.sleep(50) // slow the processing down
val result = executor.snapshotResult(sessionId, resultID, 2)
result.getType match {
case TypedResult.ResultType.PAYLOAD =>
rows.clear()
(1 to result.getPayload).foreach { page =>
rows ++= executor.retrieveResultPage(resultID, page).asScala
}
case TypedResult.ResultType.EOS => loop = false
case TypedResult.ResultType.EMPTY =>
}
}
resultSet = ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(columnInfos)
.data(rows.toArray[Row])
.build
setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
} finally {
@ -141,6 +109,43 @@ class ExecuteStatement(
}
}
private def runQueryOperation(operation: QueryOperation): Unit = {
val resultDescriptor = executor.executeQuery(sessionId, operation)
val resultID = resultDescriptor.getResultId
val rows = new ArrayBuffer[Row]()
var loop = true
while (loop) {
Thread.sleep(50) // slow the processing down
val result = executor.snapshotResult(sessionId, resultID, 2)
result.getType match {
case TypedResult.ResultType.PAYLOAD =>
rows.clear()
(1 to result.getPayload).foreach { page =>
rows ++= executor.retrieveResultPage(resultID, page).asScala
}
case TypedResult.ResultType.EOS => loop = false
case TypedResult.ResultType.EMPTY =>
}
}
resultSet = ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(resultDescriptor.getResultSchema.getColumns)
.data(rows.toArray[Row])
.build
setState(OperationState.FINISHED)
}
private def runOperation(operation: Operation): Unit = {
val result = executor.executeOperation(sessionId, operation)
result.await()
resultSet = ResultSet.fromTableResult(result)
setState(OperationState.FINISHED)
}
private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =

View File

@ -24,11 +24,12 @@ import java.util.Collections
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.flink.table.catalog.Column
import org.apache.flink.table.types.logical.{DecimalType, _}
import org.apache.flink.types.Row
import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.engine.flink.result.{ColumnInfo, ResultSet}
import org.apache.kyuubi.engine.flink.result.ResultSet
object RowSet {
@ -58,7 +59,7 @@ object RowSet {
val size = rows.length
val tRowSet = new TRowSet(0, new util.ArrayList[TRow](size))
resultSet.getColumns.asScala.zipWithIndex.foreach { case (filed, i) =>
val tColumn = toTColumn(rows, i, filed.getLogicalType)
val tColumn = toTColumn(rows, i, filed.getDataType.getLogicalType)
tRowSet.addToColumns(tColumn)
}
tRowSet
@ -69,7 +70,7 @@ object RowSet {
row: Row,
resultSet: ResultSet): TColumnValue = {
val logicalType = resultSet.getColumns.get(ordinal).getLogicalType
val logicalType = resultSet.getColumns.get(ordinal).getDataType.getLogicalType
logicalType match {
case _: BooleanType =>
@ -184,11 +185,11 @@ object RowSet {
ret
}
def toTColumnDesc(field: ColumnInfo, pos: Int): TColumnDesc = {
def toTColumnDesc(field: Column, pos: Int): TColumnDesc = {
val tColumnDesc = new TColumnDesc()
tColumnDesc.setColumnName(field.getName)
tColumnDesc.setTypeDesc(toTTypeDesc(field.getLogicalType))
tColumnDesc.setComment("")
tColumnDesc.setTypeDesc(toTTypeDesc(field.getDataType.getLogicalType))
tColumnDesc.setComment(field.getComment.orElse(""))
tColumnDesc.setPosition(pos)
tColumnDesc
}

View File

@ -90,4 +90,84 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
assert(metaData.getScale(2) == 2)
}
}
test("execute statement - show functions") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("show functions")
val metadata = resultSet.getMetaData
assert(metadata.getColumnName(1) == "function name")
assert(resultSet.next())
}
}
test("execute statement - show databases") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("show databases")
val metadata = resultSet.getMetaData
assert(metadata.getColumnName(1) == "database name")
assert(resultSet.next())
assert(resultSet.getString(1) == "default_database")
}
}
test("execute statement - show tables") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("show tables")
val metadata = resultSet.getMetaData
assert(metadata.getColumnName(1) == "table name")
assert(!resultSet.next())
}
}
test("execute statement - explain query") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("explain select 1")
val metadata = resultSet.getMetaData
assert(metadata.getColumnName(1) == "result")
assert(resultSet.next())
}
}
test("execute statement - create/alter/drop catalog") {
// TODO: validate table results after FLINK-25558 is resolved
withJdbcStatement()({ statement =>
statement.executeQuery("create catalog cat_a with ('type'='generic_in_memory')")
assert(statement.execute("drop catalog cat_a"))
})
}
test("execute statement - create/alter/drop database") {
// TODO: validate table results after FLINK-25558 is resolved
withJdbcStatement()({ statement =>
statement.executeQuery("create database db_a")
assert(statement.execute("alter database db_a set ('k1' = 'v1')"))
assert(statement.execute("drop database db_a"))
})
}
test("execute statement - create/alter/drop table") {
// TODO: validate table results after FLINK-25558 is resolved
withJdbcStatement()({ statement =>
statement.executeQuery("create table tbl_a (a string)")
assert(statement.execute("alter table tbl_a rename to tbl_b"))
assert(statement.execute("drop table tbl_b"))
})
}
test("execute statement - create/alter/drop view") {
// TODO: validate table results after FLINK-25558 is resolved
withMultipleConnectionJdbcStatement()({ statement =>
statement.executeQuery("create view view_a as select 1")
assert(statement.execute("alter view view_a rename to view_b"))
assert(statement.execute("drop view view_b"))
})
}
ignore("execute statement - insert into") {
// TODO: ignore temporally due to KYUUBI #1704
withMultipleConnectionJdbcStatement()({ statement =>
statement.executeQuery("create table tbl_a (a int) with ('connector' = 'blackhole')")
statement.executeUpdate("insert into tbl_a select 1")
})
}
}