[KYUUBI #1839] Minor refactor OperationUtil

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

This is a sub-task of KPIP-2 #1322.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1843 from link3280/improvemnt/KYUUBI-1839.

Closes #1839

b8704274 [Paul Lin] [KYUUBI #1839] Improve docs and comments
a187a539 [Paul Lin] [KYUUBI #1839] Improve docs and comments
7c36c5ca [Paul Lin] [KYUUBI #1839] Remove unused imports
48d611db [Paul Lin] [KYUUBI #1839] Rewrite ResultSetUtil using scala
92a2e3b6 [Paul Lin] [KYUUBI #1839] Rename OperationUtil to ResultSetUtil
811d3a76 [Paul Lin] [KYUUBI #1839] Minor refactor OperationUtil

Authored-by: Paul Lin <paullin3280@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Paul Lin 2022-01-27 19:11:42 +08:00 committed by Cheng Pan
parent 33d53db3ff
commit 65d50e24ee
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
6 changed files with 67 additions and 87 deletions

View File

@ -1,73 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.result;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
/** Utility class for flink operation. */
public class OperationUtil {
public static ResultSet stringListToResultSet(List<String> strings, String columnName) {
List<Row> data = new ArrayList<>();
boolean isNullable = false;
int maxLength = VarCharType.DEFAULT_LENGTH;
for (String str : strings) {
if (str == null) {
isNullable = true;
} else {
maxLength = Math.max(str.length(), maxLength);
data.add(Row.of(str));
}
}
DataType dataType = DataTypes.VARCHAR(maxLength);
if (!isNullable) {
dataType.notNull();
}
return ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(Column.physical(columnName, dataType))
.data(data.toArray(new Row[0]))
.build();
}
/**
* Build a simple result with OK message. Returned when SQL commands are executed successfully.
* Noted that a new ResultSet is returned each time, because ResultSet is stateful (with its
* cursor).
*
* @return A simple result with OK message.
*/
public static ResultSet successResultSet() {
return ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(Column.physical("result", DataTypes.STRING()))
.data(new Row[] {Row.of("OK")})
.build();
}
}

View File

@ -32,7 +32,7 @@ import org.apache.flink.table.operations.command.{ResetOperation, SetOperation}
import org.apache.flink.types.Row
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.flink.result.{OperationUtil, ResultSet}
import org.apache.kyuubi.engine.flink.result.{ResultSet, ResultSetUtil}
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@ -198,7 +198,7 @@ class ExecuteStatement(
// reset all properties
executor.resetSessionProperties(sessionId)
}
resultSet = OperationUtil.successResultSet()
resultSet = ResultSetUtil.successResultSet
setState(OperationState.FINISHED)
}

View File

@ -17,9 +17,7 @@
package org.apache.kyuubi.engine.flink.operation
import scala.collection.JavaConverters._
import org.apache.kyuubi.engine.flink.result.OperationUtil
import org.apache.kyuubi.engine.flink.result.ResultSetUtil
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
import org.apache.kyuubi.session.Session
@ -30,8 +28,8 @@ class GetCatalogs(session: Session)
override protected def runInternal(): Unit = {
try {
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
val catalogs = tableEnv.listCatalogs.toList.asJava
resultSet = OperationUtil.stringListToResultSet(catalogs, TABLE_CAT)
val catalogs = tableEnv.listCatalogs.toList
resultSet = ResultSetUtil.stringListToResultSet(catalogs, TABLE_CAT)
} catch onError()
}
}

View File

@ -17,9 +17,7 @@
package org.apache.kyuubi.engine.flink.operation
import scala.collection.JavaConverters.seqAsJavaListConverter
import org.apache.kyuubi.engine.flink.result.{Constants, OperationUtil}
import org.apache.kyuubi.engine.flink.result.{Constants, ResultSetUtil}
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_TYPE
import org.apache.kyuubi.session.Session
@ -28,7 +26,7 @@ class GetTableTypes(session: Session)
extends FlinkOperation(OperationType.GET_TABLE_TYPES, session) {
override protected def runInternal(): Unit = {
val tableTypes = Constants.SUPPORTED_TABLE_TYPES.toList.asJava
resultSet = OperationUtil.stringListToResultSet(tableTypes, TABLE_TYPE)
val tableTypes = Constants.SUPPORTED_TABLE_TYPES.toList
resultSet = ResultSetUtil.stringListToResultSet(tableTypes, TABLE_TYPE)
}
}

View File

@ -23,7 +23,7 @@ import scala.util.{Failure, Success, Try}
import org.apache.commons.lang3.StringUtils
import org.apache.flink.table.catalog.ObjectIdentifier
import org.apache.kyuubi.engine.flink.result.{Constants, OperationUtil}
import org.apache.kyuubi.engine.flink.result.{Constants, ResultSetUtil}
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.session.Session
@ -65,7 +65,7 @@ class GetTables(
}
}
resultSet = OperationUtil.stringListToResultSet(tables.asJava, Constants.SHOW_TABLES_RESULT)
resultSet = ResultSetUtil.stringListToResultSet(tables.toList, Constants.SHOW_TABLES_RESULT)
} catch onError()
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.result;
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.catalog.Column
import org.apache.flink.types.Row
/** Utility object for building ResultSet. */
object ResultSetUtil {
/**
* Build a ResultSet with a column name and a list of String values.
*
* @param strings list of String values
* @param columnName name of the result column
* @return a ResultSet with a string column
*/
def stringListToResultSet(strings: List[String], columnName: String): ResultSet = {
val rows: Array[Row] = strings.map(s => Row.of(s)).toArray
ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(Column.physical(columnName, DataTypes.STRING))
.data(rows)
.build
}
/**
* Build a simple ResultSet with OK message. Returned when SQL commands are executed successfully.
* Noted that a new ResultSet is returned each time, because ResultSet is stateful (with its
* cursor).
*
* @return a simple ResultSet with OK message.
*/
def successResultSet: ResultSet =
ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(Column.physical("result", DataTypes.STRING))
.data(Array[Row](Row.of("OK")))
.build
}