From d2f10e9da800307dd793b014bcfb05720c5731b3 Mon Sep 17 00:00:00 2001 From: xifeng yang Date: Tue, 11 Jan 2022 15:11:32 +0800 Subject: [PATCH] [KYUUBI #1646] support flink get tables ### _Why are the changes needed?_ ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate ![image](https://user-images.githubusercontent.com/12961802/148889184-e56fc78b-fb03-42f5-b866-1d06b8c335a0.png) - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1713 from xifeng/kyuubi-1646. Closes #1646 01522433 [xifeng yang] [KYUUBI #1646] support flink get tables Authored-by: xifeng yang Signed-off-by: Kent Yao --- .../kyuubi/engine/flink/result/Constants.java | 5 +- .../operation/FlinkSQLOperationManager.scala | 22 ++++- .../engine/flink/operation/GetTables.scala | 86 +++++++++++++++++++ .../flink/operation/FlinkOperationSuite.scala | 51 +++++++++++ .../spark/operation/SparkOperation.scala | 27 +----- .../kyuubi/operation/AbstractOperation.scala | 24 ++++++ 6 files changed, 187 insertions(+), 28 deletions(-) create mode 100644 externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java index 9151ed864..3c1f1abdc 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java +++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java @@ -54,5 +54,8 @@ public class Constants { public static final String SET_KEY = "key"; public static final String SET_VALUE = "value"; - public static final String[] SUPPORTED_TABLE_TYPES = new String[] {"TABLE", "VIEW"}; + public static final String TABLE_TYPE = "TABLE"; + public static final String VIEW_TYPE = "VIEW"; + + public static final String[] SUPPORTED_TABLE_TYPES = new String[] {TABLE_TYPE, VIEW_TYPE}; } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala index b54be21ac..f1fba96e1 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala @@ -19,6 +19,9 @@ package org.apache.kyuubi.engine.flink.operation import java.util +import scala.collection.JavaConverters.asScalaBufferConverter + +import org.apache.kyuubi.engine.flink.result.Constants import org.apache.kyuubi.operation.{Operation, OperationManager} import org.apache.kyuubi.session.Session @@ -51,7 +54,24 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage catalogName: String, schemaName: String, tableName: String, - tableTypes: util.List[String]): Operation = null + tableTypes: util.List[String]): Operation = { + + val tTypes = + if (tableTypes == null || tableTypes.isEmpty) { + Constants.SUPPORTED_TABLE_TYPES.toSet + } else { + tableTypes.asScala.toSet + } + + val op = new GetTables( + session = session, + catalog = catalogName, + schema = schemaName, + tableName = tableName, + tableTypes = tTypes) + + addOperation(op) + } override def newGetTableTypesOperation(session: Session): Operation = { val op = new GetTableTypes(session) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala new file mode 100644 index 000000000..f9d6e981c --- /dev/null +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.flink.operation + +import scala.collection.JavaConverters.{iterableAsScalaIterableConverter, seqAsJavaListConverter} +import scala.util.{Failure, Success, Try} + +import org.apache.flink.table.catalog.ObjectIdentifier + +import org.apache.kyuubi.engine.flink.result.{Constants, OperationUtil} +import org.apache.kyuubi.operation.OperationType +import org.apache.kyuubi.session.Session + +class GetTables( + session: Session, + catalog: String, + schema: String, + tableName: String, + tableTypes: Set[String]) + extends FlinkOperation(OperationType.GET_TABLES, session) { + + override protected def runInternal(): Unit = { + try { + val tableEnv = sessionContext.getExecutionContext.getTableEnvironment + + var catalogName = catalog + if (catalog == null || catalog.isEmpty) { + catalogName = tableEnv.getCurrentCatalog + } + + val schemaPattern = toJavaRegex(schema).r.pattern + val tableNamePattern = toJavaRegex(tableName).r.pattern + + var tables = List[String]() + + val optional = tableEnv.getCatalog(catalogName) + if (optional.isPresent) { + val currCatalog = optional.get() + tables = currCatalog.listDatabases().asScala + .filter(database => + schemaPattern.matcher(database).matches()) + .flatMap { database => + currCatalog.listTables(database).asScala + .filter(identifier => + tableNamePattern.matcher(identifier).matches()) + .filter(identifier => { + // only table or view + if (!tableTypes.contains(Constants.TABLE_TYPE) || !tableTypes.contains( + Constants.VIEW_TYPE)) { + // try to get table kind + Try(currCatalog.getTable(ObjectIdentifier.of( + catalogName, + database, + identifier).toObjectPath)) match { + case Success(table) => tableTypes.contains(table.getTableKind.name()) + case Failure(_) => false + } + } else { + true + } + }) + }.toList + } + + resultSet = OperationUtil.stringListToResultSet( + tables.asJava, + Constants.SHOW_TABLES_RESULT) + + } catch onError() + } +} diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala index 20a8f61a0..7bd31ecd1 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala @@ -68,6 +68,57 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper { } } + test("get tables") { + val table = "table_1_test" + val table_view = "table_1_test_view" + + withJdbcStatement(table) { statement => + statement.execute( + s""" + | create table $table ( + | id int, + | name string, + | price double + | ) with ( + | 'connector' = 'filesystem' + | ) + """.stripMargin) + + statement.execute( + s""" + | create view ${table_view} + | as select 1 + """.stripMargin) + + val metaData = statement.getConnection.getMetaData + val rs1 = metaData.getTables(null, null, null, null) + assert(rs1.next()) + assert(rs1.getString(1) == table) + assert(rs1.next()) + assert(rs1.getString(1) == table_view) + + // get table , table name like table% + val rs2 = metaData.getTables(null, null, "table%", Array("TABLE")) + assert(rs2.next()) + assert(rs2.getString(1) == table) + assert(!rs2.next()) + + // get view , view name like * + val rs3 = metaData.getTables(null, "default_database", "*", Array("VIEW")) + assert(rs3.next()) + assert(rs3.getString(1) == table_view) + + // get view , view name like *, schema pattern like default_% + val rs4 = metaData.getTables(null, "default_%", "*", Array("VIEW")) + assert(rs4.next()) + assert(rs4.getString(1) == table_view) + + // get view , view name like *, schema pattern like no_exists_% + val rs5 = metaData.getTables(null, "no_exists_%", "*", Array("VIEW")) + assert(!rs5.next()) + } + } + test("execute statement - select column name with dots") { withJdbcStatement() { statement => val resultSet = statement.executeQuery("select 'tmp.hello'") diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 4f91b1e06..314b2276b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -20,7 +20,6 @@ package org.apache.kyuubi.engine.spark.operation import java.io.IOException import java.time.ZoneId -import org.apache.commons.lang3.StringUtils import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema} import org.apache.spark.kyuubi.SparkUtilsHelper.redact import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -31,8 +30,7 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, KYUUBI_STATEMENT_ID_KEY} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SCHEDULER_POOL_KEY import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY -import org.apache.kyuubi.engine.spark.schema.RowSet -import org.apache.kyuubi.engine.spark.schema.SchemaHelper +import org.apache.kyuubi.engine.spark.schema.{RowSet, SchemaHelper} import org.apache.kyuubi.engine.spark.session.SparkSessionImpl import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationState} import org.apache.kyuubi.operation.FetchOrientation._ @@ -69,29 +67,6 @@ abstract class SparkOperation(opType: OperationType, session: Session) } } - /** - * convert SQL 'like' pattern to a Java regular expression. - * - * Underscores (_) are converted to '.' and percent signs (%) are converted to '.*'. - * - * (referred to Spark's implementation: convertPattern function in file MetadataOperation.java) - * - * @param input the SQL pattern to convert - * @return the equivalent Java regular expression of the pattern - */ - protected def toJavaRegex(input: String): String = { - val res = - if (StringUtils.isEmpty(input) || input == "*") { - "%" - } else { - input - } - val wStr = ".*" - res - .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr) - .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", ".") - } - private val forceCancel = session.sessionManager.getConf.get(KyuubiConf.OPERATION_FORCE_CANCEL) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index ebfb49b19..9b6d73d6c 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -19,6 +19,7 @@ package org.apache.kyuubi.operation import java.util.concurrent.Future +import org.apache.commons.lang3.StringUtils import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet, TTableSchema} import org.apache.kyuubi.{KyuubiSQLException, Logging} @@ -145,6 +146,29 @@ abstract class AbstractOperation(opType: OperationType, session: Session) override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet + /** + * convert SQL 'like' pattern to a Java regular expression. + * + * Underscores (_) are converted to '.' and percent signs (%) are converted to '.*'. + * + * (referred to Spark's implementation: convertPattern function in file MetadataOperation.java) + * + * @param input the SQL pattern to convert + * @return the equivalent Java regular expression of the pattern + */ + protected def toJavaRegex(input: String): String = { + val res = + if (StringUtils.isEmpty(input) || input == "*") { + "%" + } else { + input + } + val wStr = ".*" + res + .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr) + .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", ".") + } + override def getSession: Session = session override def getHandle: OperationHandle = handle