diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/OperationUtil.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/OperationUtil.java deleted file mode 100644 index b2d66351a..000000000 --- a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/OperationUtil.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.engine.flink.result; - -import java.util.ArrayList; -import java.util.List; -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.ResultKind; -import org.apache.flink.table.catalog.Column; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.VarCharType; -import org.apache.flink.types.Row; - -/** Utility class for flink operation. */ -public class OperationUtil { - - public static ResultSet stringListToResultSet(List strings, String columnName) { - List data = new ArrayList<>(); - boolean isNullable = false; - int maxLength = VarCharType.DEFAULT_LENGTH; - - for (String str : strings) { - if (str == null) { - isNullable = true; - } else { - maxLength = Math.max(str.length(), maxLength); - data.add(Row.of(str)); - } - } - - DataType dataType = DataTypes.VARCHAR(maxLength); - if (!isNullable) { - dataType.notNull(); - } - - return ResultSet.builder() - .resultKind(ResultKind.SUCCESS_WITH_CONTENT) - .columns(Column.physical(columnName, dataType)) - .data(data.toArray(new Row[0])) - .build(); - } - - /** - * Build a simple result with OK message. Returned when SQL commands are executed successfully. - * Noted that a new ResultSet is returned each time, because ResultSet is stateful (with its - * cursor). - * - * @return A simple result with OK message. - */ - public static ResultSet successResultSet() { - return ResultSet.builder() - .resultKind(ResultKind.SUCCESS_WITH_CONTENT) - .columns(Column.physical("result", DataTypes.STRING())) - .data(new Row[] {Row.of("OK")}) - .build(); - } -} diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala index 85decf64c..b38ceefa8 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala @@ -32,7 +32,7 @@ import org.apache.flink.table.operations.command.{ResetOperation, SetOperation} import org.apache.flink.types.Row import org.apache.kyuubi.{KyuubiSQLException, Logging} -import org.apache.kyuubi.engine.flink.result.{OperationUtil, ResultSet} +import org.apache.kyuubi.engine.flink.result.{ResultSet, ResultSetUtil} import org.apache.kyuubi.operation.{OperationState, OperationType} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -198,7 +198,7 @@ class ExecuteStatement( // reset all properties executor.resetSessionProperties(sessionId) } - resultSet = OperationUtil.successResultSet() + resultSet = ResultSetUtil.successResultSet setState(OperationState.FINISHED) } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala index 0e73c9dcd..a68f6fd24 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala @@ -17,9 +17,7 @@ package org.apache.kyuubi.engine.flink.operation -import scala.collection.JavaConverters._ - -import org.apache.kyuubi.engine.flink.result.OperationUtil +import org.apache.kyuubi.engine.flink.result.ResultSetUtil import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT import org.apache.kyuubi.session.Session @@ -30,8 +28,8 @@ class GetCatalogs(session: Session) override protected def runInternal(): Unit = { try { val tableEnv = sessionContext.getExecutionContext.getTableEnvironment - val catalogs = tableEnv.listCatalogs.toList.asJava - resultSet = OperationUtil.stringListToResultSet(catalogs, TABLE_CAT) + val catalogs = tableEnv.listCatalogs.toList + resultSet = ResultSetUtil.stringListToResultSet(catalogs, TABLE_CAT) } catch onError() } } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTableTypes.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTableTypes.scala index ad62f6b2c..b1712becb 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTableTypes.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTableTypes.scala @@ -17,9 +17,7 @@ package org.apache.kyuubi.engine.flink.operation -import scala.collection.JavaConverters.seqAsJavaListConverter - -import org.apache.kyuubi.engine.flink.result.{Constants, OperationUtil} +import org.apache.kyuubi.engine.flink.result.{Constants, ResultSetUtil} import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_TYPE import org.apache.kyuubi.session.Session @@ -28,7 +26,7 @@ class GetTableTypes(session: Session) extends FlinkOperation(OperationType.GET_TABLE_TYPES, session) { override protected def runInternal(): Unit = { - val tableTypes = Constants.SUPPORTED_TABLE_TYPES.toList.asJava - resultSet = OperationUtil.stringListToResultSet(tableTypes, TABLE_TYPE) + val tableTypes = Constants.SUPPORTED_TABLE_TYPES.toList + resultSet = ResultSetUtil.stringListToResultSet(tableTypes, TABLE_TYPE) } } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala index ef5482762..250a3c0da 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala @@ -23,7 +23,7 @@ import scala.util.{Failure, Success, Try} import org.apache.commons.lang3.StringUtils import org.apache.flink.table.catalog.ObjectIdentifier -import org.apache.kyuubi.engine.flink.result.{Constants, OperationUtil} +import org.apache.kyuubi.engine.flink.result.{Constants, ResultSetUtil} import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.session.Session @@ -65,7 +65,7 @@ class GetTables( } } - resultSet = OperationUtil.stringListToResultSet(tables.asJava, Constants.SHOW_TABLES_RESULT) + resultSet = ResultSetUtil.stringListToResultSet(tables.toList, Constants.SHOW_TABLES_RESULT) } catch onError() } } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala new file mode 100644 index 000000000..ded271cf1 --- /dev/null +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.flink.result; + +import org.apache.flink.table.api.DataTypes +import org.apache.flink.table.api.ResultKind +import org.apache.flink.table.catalog.Column +import org.apache.flink.types.Row + +/** Utility object for building ResultSet. */ +object ResultSetUtil { + + /** + * Build a ResultSet with a column name and a list of String values. + * + * @param strings list of String values + * @param columnName name of the result column + * @return a ResultSet with a string column + */ + def stringListToResultSet(strings: List[String], columnName: String): ResultSet = { + val rows: Array[Row] = strings.map(s => Row.of(s)).toArray + ResultSet.builder + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .columns(Column.physical(columnName, DataTypes.STRING)) + .data(rows) + .build + } + + /** + * Build a simple ResultSet with OK message. Returned when SQL commands are executed successfully. + * Noted that a new ResultSet is returned each time, because ResultSet is stateful (with its + * cursor). + * + * @return a simple ResultSet with OK message. + */ + def successResultSet: ResultSet = + ResultSet.builder + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .columns(Column.physical("result", DataTypes.STRING)) + .data(Array[Row](Row.of("OK"))) + .build +}