diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala index 1f55fd8d8..ac73ff757 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala @@ -47,7 +47,10 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage override def newGetSchemasOperation( session: Session, catalog: String, - schema: String): Operation = null + schema: String): Operation = { + val op = new GetSchemas(session, catalog, schema) + addOperation(op) + } override def newGetTablesOperation( session: Session, diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetSchemas.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetSchemas.scala new file mode 100644 index 000000000..6699faf4b --- /dev/null +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetSchemas.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.flink.operation + +import scala.collection.JavaConverters._ + +import org.apache.commons.lang3.StringUtils +import org.apache.flink.table.api.{DataTypes, ResultKind, TableEnvironment} +import org.apache.flink.table.catalog.Column +import org.apache.flink.types.Row + +import org.apache.kyuubi.engine.flink.result.ResultSet +import org.apache.kyuubi.engine.flink.util.StringUtils.filterPattern +import org.apache.kyuubi.operation.OperationType +import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ +import org.apache.kyuubi.session.Session + +class GetSchemas(session: Session, catalogName: String, schema: String) + extends FlinkOperation(OperationType.GET_SCHEMAS, session) { + + override protected def runInternal(): Unit = { + try { + val schemaPattern = toJavaRegex(schema) + val tableEnv: TableEnvironment = sessionContext.getExecutionContext.getTableEnvironment + val schemas = tableEnv.listCatalogs() + .filter { c => StringUtils.isEmpty(catalogName) || c == catalogName } + .flatMap { c => + val catalog = tableEnv.getCatalog(c).get() + filterPattern(catalog.listDatabases().asScala, schemaPattern) + .map { d => Row.of(d, c) } + } + resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .columns( + Column.physical(TABLE_SCHEM, DataTypes.STRING()), + Column.physical(TABLE_CATALOG, DataTypes.STRING())) + .data(schemas) + .build + } catch { + onError() + } + } +} diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala index c11f027e0..8801a974e 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala @@ -17,6 +17,8 @@ package org.apache.kyuubi.engine.flink.operation +import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG +import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ @@ -27,6 +29,8 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.FUNCTION_CAT import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.FUNCTION_NAME import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.FUNCTION_SCHEM import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT +import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CATALOG +import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_SCHEM import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_TYPE import org.apache.kyuubi.service.ServiceState._ @@ -58,6 +62,24 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper { } } + test("get schemas") { + withJdbcStatement() { statement => + val metaData = statement.getConnection.getMetaData + var resultSet = metaData.getSchemas(null, null) + while (resultSet.next()) { + assert(resultSet.getString(TABLE_SCHEM) == DEFAULT_BUILTIN_DATABASE) + assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG) + } + resultSet = metaData.getSchemas( + DEFAULT_BUILTIN_CATALOG.split("_").apply(0), + DEFAULT_BUILTIN_DATABASE.split("_").apply(0)) + while (resultSet.next()) { + assert(resultSet.getString(TABLE_SCHEM) == DEFAULT_BUILTIN_DATABASE) + assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG) + } + } + } + test("get table types") { withJdbcStatement() { statement => val meta = statement.getConnection.getMetaData