[KYUUBI #1646] support flink get tables

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

![image](https://user-images.githubusercontent.com/12961802/148889184-e56fc78b-fb03-42f5-b866-1d06b8c335a0.png)

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1713 from xifeng/kyuubi-1646.

Closes #1646

01522433 [xifeng yang] [KYUUBI #1646] support flink get tables

Authored-by: xifeng yang <xifeng.yang@hotmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
xifeng yang 2022-01-11 15:11:32 +08:00 committed by Kent Yao
parent 4f17f1b1af
commit d2f10e9da8
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
6 changed files with 187 additions and 28 deletions

View File

@ -54,5 +54,8 @@ public class Constants {
public static final String SET_KEY = "key";
public static final String SET_VALUE = "value";
public static final String[] SUPPORTED_TABLE_TYPES = new String[] {"TABLE", "VIEW"};
public static final String TABLE_TYPE = "TABLE";
public static final String VIEW_TYPE = "VIEW";
public static final String[] SUPPORTED_TABLE_TYPES = new String[] {TABLE_TYPE, VIEW_TYPE};
}

View File

@ -19,6 +19,9 @@ package org.apache.kyuubi.engine.flink.operation
import java.util
import scala.collection.JavaConverters.asScalaBufferConverter
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.operation.{Operation, OperationManager}
import org.apache.kyuubi.session.Session
@ -51,7 +54,24 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
catalogName: String,
schemaName: String,
tableName: String,
tableTypes: util.List[String]): Operation = null
tableTypes: util.List[String]): Operation = {
val tTypes =
if (tableTypes == null || tableTypes.isEmpty) {
Constants.SUPPORTED_TABLE_TYPES.toSet
} else {
tableTypes.asScala.toSet
}
val op = new GetTables(
session = session,
catalog = catalogName,
schema = schemaName,
tableName = tableName,
tableTypes = tTypes)
addOperation(op)
}
override def newGetTableTypesOperation(session: Session): Operation = {
val op = new GetTableTypes(session)

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.operation
import scala.collection.JavaConverters.{iterableAsScalaIterableConverter, seqAsJavaListConverter}
import scala.util.{Failure, Success, Try}
import org.apache.flink.table.catalog.ObjectIdentifier
import org.apache.kyuubi.engine.flink.result.{Constants, OperationUtil}
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.session.Session
class GetTables(
session: Session,
catalog: String,
schema: String,
tableName: String,
tableTypes: Set[String])
extends FlinkOperation(OperationType.GET_TABLES, session) {
override protected def runInternal(): Unit = {
try {
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
var catalogName = catalog
if (catalog == null || catalog.isEmpty) {
catalogName = tableEnv.getCurrentCatalog
}
val schemaPattern = toJavaRegex(schema).r.pattern
val tableNamePattern = toJavaRegex(tableName).r.pattern
var tables = List[String]()
val optional = tableEnv.getCatalog(catalogName)
if (optional.isPresent) {
val currCatalog = optional.get()
tables = currCatalog.listDatabases().asScala
.filter(database =>
schemaPattern.matcher(database).matches())
.flatMap { database =>
currCatalog.listTables(database).asScala
.filter(identifier =>
tableNamePattern.matcher(identifier).matches())
.filter(identifier => {
// only table or view
if (!tableTypes.contains(Constants.TABLE_TYPE) || !tableTypes.contains(
Constants.VIEW_TYPE)) {
// try to get table kind
Try(currCatalog.getTable(ObjectIdentifier.of(
catalogName,
database,
identifier).toObjectPath)) match {
case Success(table) => tableTypes.contains(table.getTableKind.name())
case Failure(_) => false
}
} else {
true
}
})
}.toList
}
resultSet = OperationUtil.stringListToResultSet(
tables.asJava,
Constants.SHOW_TABLES_RESULT)
} catch onError()
}
}

View File

@ -68,6 +68,57 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}
}
test("get tables") {
val table = "table_1_test"
val table_view = "table_1_test_view"
withJdbcStatement(table) { statement =>
statement.execute(
s"""
| create table $table (
| id int,
| name string,
| price double
| ) with (
| 'connector' = 'filesystem'
| )
""".stripMargin)
statement.execute(
s"""
| create view ${table_view}
| as select 1
""".stripMargin)
val metaData = statement.getConnection.getMetaData
val rs1 = metaData.getTables(null, null, null, null)
assert(rs1.next())
assert(rs1.getString(1) == table)
assert(rs1.next())
assert(rs1.getString(1) == table_view)
// get table , table name like table%
val rs2 = metaData.getTables(null, null, "table%", Array("TABLE"))
assert(rs2.next())
assert(rs2.getString(1) == table)
assert(!rs2.next())
// get view , view name like *
val rs3 = metaData.getTables(null, "default_database", "*", Array("VIEW"))
assert(rs3.next())
assert(rs3.getString(1) == table_view)
// get view , view name like *, schema pattern like default_%
val rs4 = metaData.getTables(null, "default_%", "*", Array("VIEW"))
assert(rs4.next())
assert(rs4.getString(1) == table_view)
// get view , view name like *, schema pattern like no_exists_%
val rs5 = metaData.getTables(null, "no_exists_%", "*", Array("VIEW"))
assert(!rs5.next())
}
}
test("execute statement - select column name with dots") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select 'tmp.hello'")

View File

@ -20,7 +20,6 @@ package org.apache.kyuubi.engine.spark.operation
import java.io.IOException
import java.time.ZoneId
import org.apache.commons.lang3.StringUtils
import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
import org.apache.spark.kyuubi.SparkUtilsHelper.redact
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
@ -31,8 +30,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, KYUUBI_STATEMENT_ID_KEY}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SCHEDULER_POOL_KEY
import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY
import org.apache.kyuubi.engine.spark.schema.RowSet
import org.apache.kyuubi.engine.spark.schema.SchemaHelper
import org.apache.kyuubi.engine.spark.schema.{RowSet, SchemaHelper}
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationState}
import org.apache.kyuubi.operation.FetchOrientation._
@ -69,29 +67,6 @@ abstract class SparkOperation(opType: OperationType, session: Session)
}
}
/**
* convert SQL 'like' pattern to a Java regular expression.
*
* Underscores (_) are converted to '.' and percent signs (%) are converted to '.*'.
*
* (referred to Spark's implementation: convertPattern function in file MetadataOperation.java)
*
* @param input the SQL pattern to convert
* @return the equivalent Java regular expression of the pattern
*/
protected def toJavaRegex(input: String): String = {
val res =
if (StringUtils.isEmpty(input) || input == "*") {
"%"
} else {
input
}
val wStr = ".*"
res
.replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr)
.replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", ".")
}
private val forceCancel =
session.sessionManager.getConf.get(KyuubiConf.OPERATION_FORCE_CANCEL)

View File

@ -19,6 +19,7 @@ package org.apache.kyuubi.operation
import java.util.concurrent.Future
import org.apache.commons.lang3.StringUtils
import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet, TTableSchema}
import org.apache.kyuubi.{KyuubiSQLException, Logging}
@ -145,6 +146,29 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet
/**
* convert SQL 'like' pattern to a Java regular expression.
*
* Underscores (_) are converted to '.' and percent signs (%) are converted to '.*'.
*
* (referred to Spark's implementation: convertPattern function in file MetadataOperation.java)
*
* @param input the SQL pattern to convert
* @return the equivalent Java regular expression of the pattern
*/
protected def toJavaRegex(input: String): String = {
val res =
if (StringUtils.isEmpty(input) || input == "*") {
"%"
} else {
input
}
val wStr = ".*"
res
.replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr)
.replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", ".")
}
override def getSession: Session = session
override def getHandle: OperationHandle = handle