diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/GetTables.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/GetTables.scala new file mode 100644 index 000000000..fda9c2e16 --- /dev/null +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/GetTables.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.trino.operation + +import scala.collection.mutable.ArrayBuffer + +import org.apache.commons.lang3.StringUtils + +import org.apache.kyuubi.engine.trino.TrinoStatement +import org.apache.kyuubi.operation.{IterableFetchIterator, OperationType} +import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.{TABLE_CAT, TABLE_NAME, TABLE_SCHEM, TABLE_TYPE} +import org.apache.kyuubi.session.Session + +class GetTables( + session: Session, + catalogName: String, + schemaName: String, + tableName: String, + tableTypes: Set[String]) + extends TrinoOperation(OperationType.GET_TABLES, session) { + + private val SEARCH_STRING_ESCAPE: String = "\\" + + override protected def runInternal(): Unit = { + val query = new StringBuilder( + """ + |SELECT TABLE_CAT, TABLE_SCHEM, TABLE_NAME, TABLE_TYPE, REMARKS, + |TYPE_CAT, TYPE_SCHEM, TYPE_NAME, + |SELF_REFERENCING_COL_NAME, REF_GENERATION + |FROM system.jdbc.tables + |""".stripMargin) + + val filters = ArrayBuffer[String]() + if (StringUtils.isNotEmpty(catalogName)) { + filters += s"$TABLE_CAT = '$catalogName'" + } + if (StringUtils.isNotEmpty(schemaName)) { + filters += s"$TABLE_SCHEM LIKE '$schemaName' ESCAPE '$SEARCH_STRING_ESCAPE'" + } + if (StringUtils.isNotEmpty(tableName)) { + filters += s"$TABLE_NAME LIKE '$tableName' ESCAPE '$SEARCH_STRING_ESCAPE'" + } + if (tableTypes.nonEmpty) { + filters += s"(${tableTypes.map { tableType => s"$TABLE_TYPE = '$tableType'" } + .mkString(" OR ")})" + } + + if (filters.nonEmpty) { + query.append(" WHERE ") + query.append(filters.mkString(" AND ")) + } + + try { + val trinoStatement = + TrinoStatement(trinoContext, session.sessionManager.getConf, query.toString) + schema = trinoStatement.getColumns + val resultSet = trinoStatement.execute() + iter = new IterableFetchIterator(resultSet) + } catch onError() + } +} diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala index 939fbe3ec..dc06e71fe 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala @@ -19,9 +19,10 @@ package org.apache.kyuubi.engine.trino.operation import java.util +import scala.collection.JavaConverters._ + import org.apache.kyuubi.config.KyuubiConf.OPERATION_INCREMENTAL_COLLECT -import org.apache.kyuubi.operation.Operation -import org.apache.kyuubi.operation.OperationManager +import org.apache.kyuubi.operation.{Operation, OperationManager} import org.apache.kyuubi.session.Session class TrinoOperationManager extends OperationManager("TrinoOperationManager") { @@ -57,7 +58,16 @@ class TrinoOperationManager extends OperationManager("TrinoOperationManager") { catalogName: String, schemaName: String, tableName: String, - tableTypes: util.List[String]): Operation = null + tableTypes: util.List[String]): Operation = { + val tTypes = + if (tableTypes == null || tableTypes.isEmpty) { + Set("TABLE", "VIEW") + } else { + tableTypes.asScala.toSet + } + val op = new GetTables(session, catalogName, schemaName, tableName, tTypes) + addOperation(op) + } override def newGetTableTypesOperation(session: Session): Operation = { val op = new GetTableTypes(session) diff --git a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala index ac56352da..32bd5a539 100644 --- a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala +++ b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala @@ -71,6 +71,250 @@ class TrinoOperationSuite extends WithTrinoEngine with HiveJDBCTestHelper { } } + test("trino - get tables") { + case class TableWithCatalogAndSchema( + catalog: String, + schema: String, + tableName: String, + tableType: String) + + withJdbcStatement() { statement => + val meta = statement.getConnection.getMetaData + val resultSetBuffer = ArrayBuffer[TableWithCatalogAndSchema]() + + var tables = meta.getTables(null, null, null, null) + while (tables.next()) { + resultSetBuffer += + TableWithCatalogAndSchema( + tables.getString(TABLE_CAT), + tables.getString(TABLE_SCHEM), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "system", + "information_schema", + "tables", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "information_schema", + "tables", + "TABLE"))) + + statement.execute("CREATE SCHEMA IF NOT EXISTS memory.test_escape_1") + statement.execute("CREATE SCHEMA IF NOT EXISTS memory.test1escape_1") + statement.execute("CREATE SCHEMA IF NOT EXISTS memory.test_escape11") + statement.execute("CREATE TABLE IF NOT EXISTS memory.test_escape_1.test_escape_1(a varchar)") + statement.execute("CREATE TABLE IF NOT EXISTS memory.test1escape_1.test1escape_1(a varchar)") + statement.execute("CREATE TABLE IF NOT EXISTS memory.test_escape11.test_escape11(a varchar)") + statement.execute( + """ + |CREATE OR REPLACE VIEW memory.test_escape_1.test_view AS + |SELECT * FROM memory.test_escape_1.test_escape_1 + |""".stripMargin) + + tables = meta.getTables(null, null, null, null) + resultSetBuffer.clear() + while (tables.next()) { + resultSetBuffer += + TableWithCatalogAndSchema( + tables.getString(TABLE_CAT), + tables.getString(TABLE_SCHEM), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "system", + "information_schema", + "tables", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "information_schema", + "tables", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape_1", + "test_escape_1", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test1escape_1", + "test1escape_1", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape11", + "test_escape11", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape_1", + "test_view", + "VIEW"))) + + tables = meta.getTables("memory", null, null, null) + resultSetBuffer.clear() + while (tables.next()) { + resultSetBuffer += + TableWithCatalogAndSchema( + tables.getString(TABLE_CAT), + tables.getString(TABLE_SCHEM), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "information_schema", + "tables", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape_1", + "test_escape_1", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test1escape_1", + "test1escape_1", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape11", + "test_escape11", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape_1", + "test_view", + "VIEW"))) + + tables = meta.getTables(null, "test%", null, null) + resultSetBuffer.clear() + while (tables.next()) { + resultSetBuffer += + TableWithCatalogAndSchema( + tables.getString(TABLE_CAT), + tables.getString(TABLE_SCHEM), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape_1", + "test_escape_1", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test1escape_1", + "test1escape_1", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape11", + "test_escape11", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape_1", + "test_view", + "VIEW"))) + + tables = meta.getTables(null, null, "test_%", null) + resultSetBuffer.clear() + while (tables.next()) { + resultSetBuffer += + TableWithCatalogAndSchema( + tables.getString(TABLE_CAT), + tables.getString(TABLE_SCHEM), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape_1", + "test_escape_1", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape11", + "test_escape11", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape_1", + "test_view", + "VIEW"))) + + tables = meta.getTables(null, null, null, Array("TABLE")) + resultSetBuffer.clear() + while (tables.next()) { + resultSetBuffer += + TableWithCatalogAndSchema( + tables.getString(TABLE_CAT), + tables.getString(TABLE_SCHEM), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "system", + "information_schema", + "tables", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "information_schema", + "tables", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape_1", + "test_escape_1", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test1escape_1", + "test1escape_1", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape11", + "test_escape11", + "TABLE"))) + + tables = meta.getTables(null, null, "test_escape\\_1", null) + resultSetBuffer.clear() + while (tables.next()) { + resultSetBuffer += + TableWithCatalogAndSchema( + tables.getString(TABLE_CAT), + tables.getString(TABLE_SCHEM), + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test_escape_1", + "test_escape_1", + "TABLE"))) + assert(resultSetBuffer.contains(TableWithCatalogAndSchema( + "memory", + "test1escape_1", + "test1escape_1", + "TABLE"))) + + statement.execute("DROP VIEW memory.test_escape_1.test_view") + statement.execute("DROP TABLE memory.test_escape_1.test_escape_1") + statement.execute("DROP TABLE memory.test1escape_1.test1escape_1") + statement.execute("DROP TABLE memory.test_escape11.test_escape11") + statement.execute("DROP SCHEMA memory.test_escape_1") + statement.execute("DROP SCHEMA memory.test1escape_1") + statement.execute("DROP SCHEMA memory.test_escape11") + } + } + test("trino - get schemas") { case class SchemaWithCatalog(catalog: String, schema: String)