[KYUUBI #1643] Implement GetFunctions operation

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Implement GetFunctions operation.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1749 from SteNicholas/KYUUBI-1643.

Closes #1643

442f4f1a [SteNicholas] [KYUUBI #1643] Implement GetFunctions operation
efc380ef [SteNicholas] [KYUUBI #1643] Implement GetFunctions operation
af43756b [SteNicholas] [KYUUBI #1643] Implement GetFunctions operation

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
SteNicholas 2022-01-14 17:08:19 +08:00 committed by Cheng Pan
parent b91854e33b
commit ce42d34ebb
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
6 changed files with 195 additions and 4 deletions

View File

@ -207,6 +207,8 @@ This product includes code from Apache Spark
* org.apache.kyuubi.Logging copied from classes in org.apache.spark.internal.Logging
* org.apache.kyuubi.engine.spark.FetchIterator copied from org.apache.spark.sql.hive.thriftserver.FetchIterator
* org.apache.kyuubi.engine.spark.shim.CatalogShim_v3_0 copied some methods from org.apache.spark.sql.connector.catalog.CatalogV2Implicits
* org.apache.kyuubi.engine.flink.util.StringUtils copied from classes in org.apache.spark.sql.catalyst.util.StringUtils
* org.apache.kyuubi.engine.flink.util.StringUtilsSuite copied from classes in org.apache.spark.sql.catalyst.util.StringUtilsSuite
Copyright: 2014 and onwards The Apache Software Foundation
Home page: https://spark.apache.org/

View File

@ -89,6 +89,8 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
session: Session,
catalogName: String,
schemaName: String,
functionName: String): Operation = null
functionName: String): Operation = {
val op = new GetFunctions(session, catalogName, schemaName, functionName)
addOperation(op)
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.operation
import scala.collection.JavaConverters._
import org.apache.commons.lang3.StringUtils
import org.apache.flink.table.api.{DataTypes, ResultKind, TableEnvironment}
import org.apache.flink.table.catalog.Column
import org.apache.flink.types.Row
import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.engine.flink.util.StringUtils.filterPattern
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetFunctions(
session: Session,
catalogName: String,
schemaName: String,
functionName: String)
extends FlinkOperation(OperationType.GET_FUNCTIONS, session) {
override protected def runInternal(): Unit = {
try {
val schemaPattern = toJavaRegex(schemaName)
val functionPattern = toJavaRegex(functionName)
val tableEnv: TableEnvironment = sessionContext.getExecutionContext.getTableEnvironment
val systemFunctions = filterPattern(
tableEnv.listFunctions().diff(tableEnv.listUserDefinedFunctions()),
functionPattern)
.map { f => Row.of(null, null, f) }
val catalogFunctions = tableEnv.listCatalogs()
.filter { c => StringUtils.isEmpty(catalogName) || c == catalogName }
.flatMap { c =>
val catalog = tableEnv.getCatalog(c).get()
filterPattern(catalog.listDatabases().asScala, schemaPattern)
.flatMap { d =>
filterPattern(catalog.listFunctions(d).asScala, functionPattern)
.map { f => Row.of(c, d, f) }
}
}
resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(
Column.physical(FUNCTION_CAT, DataTypes.STRING()),
Column.physical(FUNCTION_SCHEM, DataTypes.STRING()),
Column.physical(FUNCTION_NAME, DataTypes.STRING()))
.data(systemFunctions ++: catalogFunctions)
.build
} catch {
onError()
}
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.util
import java.util.regex.PatternSyntaxException
object StringUtils {
/**
* This utility can be used for filtering pattern from the catalog, schema and function name etc.
* Forked from Apache Spark's org.apache.spark.sql.catalyst.util.StringUtils
* @param names the names list to be filtered
* @param pattern the filter pattern, only '*' and '|' are allowed as wildcards, others will
* follow regular expression convention, case insensitive match and white spaces
* on both ends will be ignored
* @return the filtered names list in order
*/
def filterPattern(names: Seq[String], pattern: String): Seq[String] = {
val funcNames = scala.collection.mutable.SortedSet.empty[String]
pattern.trim().split("\\|").foreach { subPattern =>
try {
val regex = ("(?i)" + subPattern.replaceAll("\\*", ".*")).r
funcNames ++= names.filter { name => regex.pattern.matcher(name).matches() }
} catch {
case _: PatternSyntaxException =>
}
}
funcNames.toSeq
}
}

View File

@ -23,6 +23,9 @@ import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.FUNCTION_CAT
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.FUNCTION_NAME
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.FUNCTION_SCHEM
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_TYPE
import org.apache.kyuubi.service.ServiceState._
@ -42,7 +45,7 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}
}
test("get catalogs for flink sql") {
test("get catalogs") {
withJdbcStatement() { statement =>
val meta = statement.getConnection.getMetaData
val catalogs = meta.getCatalogs
@ -55,7 +58,7 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}
}
test("get table type for flink sql") {
test("get table types") {
withJdbcStatement() { statement =>
val meta = statement.getConnection.getMetaData
val types = meta.getTableTypes
@ -119,6 +122,36 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}
}
test("get functions") {
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
Seq("currentTimestamp", "currentDate", "currentTime", "localTimestamp", "localTime")
.foreach { func =>
Seq(metaData.getFunctions _).foreach { apiFunc =>
val resultSet = apiFunc(null, null, func)
while (resultSet.next()) {
assert(resultSet.getString(FUNCTION_CAT) == null)
assert(resultSet.getString(FUNCTION_SCHEM) === null)
assert(resultSet.getString(FUNCTION_NAME) === func)
}
}
}
val expected =
List("currentTimestamp", "currentDate", "currentTime", "localTimestamp", "localTime")
Seq("current", "local")
.foreach { funcPattern =>
Seq(metaData.getFunctions _).foreach { apiFunc =>
val resultSet = apiFunc(null, null, funcPattern)
while (resultSet.next()) {
assert(resultSet.getString(FUNCTION_CAT) == null)
assert(resultSet.getString(FUNCTION_SCHEM) === null)
assert(expected.contains(resultSet.getString(FUNCTION_NAME)))
}
}
}
}
}
test("execute statement - select column name with dots") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select 'tmp.hello'")

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.util
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.engine.flink.util.StringUtils.filterPattern
class StringUtilsSuite extends KyuubiFunSuite {
/**
* Forked from Apache Spark's org.apache.spark.sql.catalyst.util.StringUtilsSuite
*/
test("filter pattern") {
val names = Seq("a1", "a2", "b2", "c3")
assert(filterPattern(names, " * ") === Seq("a1", "a2", "b2", "c3"))
assert(filterPattern(names, "*a*") === Seq("a1", "a2"))
assert(filterPattern(names, " *a* ") === Seq("a1", "a2"))
assert(filterPattern(names, " a* ") === Seq("a1", "a2"))
assert(filterPattern(names, " a.* ") === Seq("a1", "a2"))
assert(filterPattern(names, " B.*|a* ") === Seq("a1", "a2", "b2"))
assert(filterPattern(names, " a. ") === Seq("a1", "a2"))
assert(filterPattern(names, " d* ") === Nil)
}
}