[KYUUBI #1645] Implement Flink engine GetSchemas operation

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Implement GetSchemas operation.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1768 from SteNicholas/KYUUBI-1645.

Closes #1645

7ff66db9 [SteNicholas] [KYUUBI apache#1645] Implement GetSchemas operation

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
SteNicholas 2022-01-17 10:07:22 +08:00 committed by Kent Yao
parent 36b95d333f
commit cab0762c6c
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
3 changed files with 83 additions and 1 deletions

View File

@ -47,7 +47,10 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
override def newGetSchemasOperation(
session: Session,
catalog: String,
schema: String): Operation = null
schema: String): Operation = {
val op = new GetSchemas(session, catalog, schema)
addOperation(op)
}
override def newGetTablesOperation(
session: Session,

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.operation
import scala.collection.JavaConverters._
import org.apache.commons.lang3.StringUtils
import org.apache.flink.table.api.{DataTypes, ResultKind, TableEnvironment}
import org.apache.flink.table.catalog.Column
import org.apache.flink.types.Row
import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.engine.flink.util.StringUtils.filterPattern
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetSchemas(session: Session, catalogName: String, schema: String)
extends FlinkOperation(OperationType.GET_SCHEMAS, session) {
override protected def runInternal(): Unit = {
try {
val schemaPattern = toJavaRegex(schema)
val tableEnv: TableEnvironment = sessionContext.getExecutionContext.getTableEnvironment
val schemas = tableEnv.listCatalogs()
.filter { c => StringUtils.isEmpty(catalogName) || c == catalogName }
.flatMap { c =>
val catalog = tableEnv.getCatalog(c).get()
filterPattern(catalog.listDatabases().asScala, schemaPattern)
.map { d => Row.of(d, c) }
}
resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(
Column.physical(TABLE_SCHEM, DataTypes.STRING()),
Column.physical(TABLE_CATALOG, DataTypes.STRING()))
.data(schemas)
.build
} catch {
onError()
}
}
}

View File

@ -17,6 +17,8 @@
package org.apache.kyuubi.engine.flink.operation
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
@ -27,6 +29,8 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.FUNCTION_CAT
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.FUNCTION_NAME
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.FUNCTION_SCHEM
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CATALOG
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_SCHEM
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_TYPE
import org.apache.kyuubi.service.ServiceState._
@ -58,6 +62,24 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}
}
test("get schemas") {
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
var resultSet = metaData.getSchemas(null, null)
while (resultSet.next()) {
assert(resultSet.getString(TABLE_SCHEM) == DEFAULT_BUILTIN_DATABASE)
assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG)
}
resultSet = metaData.getSchemas(
DEFAULT_BUILTIN_CATALOG.split("_").apply(0),
DEFAULT_BUILTIN_DATABASE.split("_").apply(0))
while (resultSet.next()) {
assert(resultSet.getString(TABLE_SCHEM) == DEFAULT_BUILTIN_DATABASE)
assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG)
}
}
}
test("get table types") {
withJdbcStatement() { statement =>
val meta = statement.getConnection.getMetaData