[KYUUBI #1643][FOLLOWUP] Fix Flink GetFunctions result schema
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Implement GetFunctions operation. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1773 from SteNicholas/KYUUBI-1643. Closes #1643 498c7f99 [SteNicholas] [KYUUBI #1643] Implement GetFunctions operation Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
7ca1665bfc
commit
012fc75cad
@ -17,6 +17,8 @@
|
||||
|
||||
package org.apache.kyuubi.engine.flink.operation
|
||||
|
||||
import java.sql.DatabaseMetaData
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.commons.lang3.StringUtils
|
||||
@ -45,7 +47,9 @@ class GetFunctions(
|
||||
val systemFunctions = filterPattern(
|
||||
tableEnv.listFunctions().diff(tableEnv.listUserDefinedFunctions()),
|
||||
functionPattern)
|
||||
.map { f => Row.of(null, null, f) }
|
||||
.map { f =>
|
||||
Row.of(null, null, f, null, Integer.valueOf(DatabaseMetaData.functionResultUnknown), null)
|
||||
}
|
||||
val catalogFunctions = tableEnv.listCatalogs()
|
||||
.filter { c => StringUtils.isEmpty(catalogName) || c == catalogName }
|
||||
.flatMap { c =>
|
||||
@ -53,14 +57,25 @@ class GetFunctions(
|
||||
filterPattern(catalog.listDatabases().asScala, schemaPattern)
|
||||
.flatMap { d =>
|
||||
filterPattern(catalog.listFunctions(d).asScala, functionPattern)
|
||||
.map { f => Row.of(c, d, f) }
|
||||
.map { f =>
|
||||
Row.of(
|
||||
c,
|
||||
d,
|
||||
f,
|
||||
null,
|
||||
Integer.valueOf(DatabaseMetaData.functionResultUnknown),
|
||||
null)
|
||||
}
|
||||
}
|
||||
}
|
||||
resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
|
||||
.columns(
|
||||
Column.physical(FUNCTION_CAT, DataTypes.STRING()),
|
||||
Column.physical(FUNCTION_SCHEM, DataTypes.STRING()),
|
||||
Column.physical(FUNCTION_NAME, DataTypes.STRING()))
|
||||
Column.physical(FUNCTION_NAME, DataTypes.STRING()),
|
||||
Column.physical(REMARKS, DataTypes.STRING()),
|
||||
Column.physical(FUNCTION_TYPE, DataTypes.INT()),
|
||||
Column.physical(SPECIFIC_NAME, DataTypes.STRING()))
|
||||
.data(systemFunctions ++: catalogFunctions)
|
||||
.build
|
||||
} catch {
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
|
||||
package org.apache.kyuubi.engine.flink.operation
|
||||
|
||||
import java.sql.DatabaseMetaData
|
||||
|
||||
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
|
||||
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE
|
||||
import org.apache.flink.table.types.logical.LogicalTypeRoot
|
||||
@ -330,14 +332,14 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
|
||||
val metaData = statement.getConnection.getMetaData
|
||||
var resultSet = metaData.getSchemas(null, null)
|
||||
while (resultSet.next()) {
|
||||
assert(resultSet.getString(TABLE_SCHEM) == DEFAULT_BUILTIN_DATABASE)
|
||||
assert(resultSet.getString(TABLE_SCHEM) === DEFAULT_BUILTIN_DATABASE)
|
||||
assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG)
|
||||
}
|
||||
resultSet = metaData.getSchemas(
|
||||
DEFAULT_BUILTIN_CATALOG.split("_").apply(0),
|
||||
DEFAULT_BUILTIN_DATABASE.split("_").apply(0))
|
||||
while (resultSet.next()) {
|
||||
assert(resultSet.getString(TABLE_SCHEM) == DEFAULT_BUILTIN_DATABASE)
|
||||
assert(resultSet.getString(TABLE_SCHEM) === DEFAULT_BUILTIN_DATABASE)
|
||||
assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG)
|
||||
}
|
||||
}
|
||||
@ -412,26 +414,28 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
|
||||
val metaData = statement.getConnection.getMetaData
|
||||
Seq("currentTimestamp", "currentDate", "currentTime", "localTimestamp", "localTime")
|
||||
.foreach { func =>
|
||||
Seq(metaData.getFunctions _).foreach { apiFunc =>
|
||||
val resultSet = apiFunc(null, null, func)
|
||||
while (resultSet.next()) {
|
||||
assert(resultSet.getString(FUNCTION_CAT) == null)
|
||||
assert(resultSet.getString(FUNCTION_SCHEM) === null)
|
||||
assert(resultSet.getString(FUNCTION_NAME) === func)
|
||||
}
|
||||
val resultSet = metaData.getFunctions(null, null, func)
|
||||
while (resultSet.next()) {
|
||||
assert(resultSet.getString(FUNCTION_CAT) === null)
|
||||
assert(resultSet.getString(FUNCTION_SCHEM) === null)
|
||||
assert(resultSet.getString(FUNCTION_NAME) === func)
|
||||
assert(resultSet.getString(REMARKS) === null)
|
||||
assert(resultSet.getInt(FUNCTION_TYPE) === DatabaseMetaData.functionResultUnknown)
|
||||
assert(resultSet.getString(SPECIFIC_NAME) === null)
|
||||
}
|
||||
}
|
||||
val expected =
|
||||
List("currentTimestamp", "currentDate", "currentTime", "localTimestamp", "localTime")
|
||||
Seq("current", "local")
|
||||
.foreach { funcPattern =>
|
||||
Seq(metaData.getFunctions _).foreach { apiFunc =>
|
||||
val resultSet = apiFunc(null, null, funcPattern)
|
||||
while (resultSet.next()) {
|
||||
assert(resultSet.getString(FUNCTION_CAT) == null)
|
||||
assert(resultSet.getString(FUNCTION_SCHEM) === null)
|
||||
assert(expected.contains(resultSet.getString(FUNCTION_NAME)))
|
||||
}
|
||||
val resultSet = metaData.getFunctions(null, null, funcPattern)
|
||||
while (resultSet.next()) {
|
||||
assert(resultSet.getString(FUNCTION_CAT) === null)
|
||||
assert(resultSet.getString(FUNCTION_SCHEM) === null)
|
||||
assert(expected.contains(resultSet.getString(FUNCTION_NAME)))
|
||||
assert(resultSet.getString(REMARKS) === null)
|
||||
assert(resultSet.getString(FUNCTION_TYPE) === DatabaseMetaData.functionResultUnknown)
|
||||
assert(resultSet.getString(SPECIFIC_NAME) === null)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user