From c9d44b482cf17624d8a2e8c0ac5eab8a8e39511c Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 1 Feb 2021 20:55:45 +0800 Subject: [PATCH] [KYUUBI #334] GetSchemas supports DSv2 multipart namespaces ![yaooqinn](https://badgen.net/badge/Hello/yaooqinn/green) [![Closes #334](https://badgen.net/badge/Preview/Closes%20%23334/blue)](https://github.com/yaooqinn/kyuubi/pull/334) ![39](https://badgen.net/badge/%2B/39/red) ![8](https://badgen.net/badge/-/8/green) ![2](https://badgen.net/badge/commits/2/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) [❨?❩](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info) ### _Why are the changes needed?_ This PR adds support for using Spark DSv2 to get Schemas with multipart namespaces under multiple catalogs. The current works are all based on Apache Iceberg. TODOS: next step we will support get tables operation for multiple catalogs and namespaces. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate ![image](https://user-images.githubusercontent.com/8326978/106436479-176ce900-64af-11eb-9152-7fb65645e0de.png) - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request Closes #334 from yaooqinn/getschema2. 5994170 [Kent Yao] update 188258a [Kent Yao] GetSchemas supports DSv2 multipart namespaces Authored-by: Kent Yao Signed-off-by: Kent Yao --- .../kyuubi/engine/spark/shim/Shim_v3_0.scala | 20 +++++++++++++++---- .../operation/BasicIcebergJDBCTests.scala | 20 +++++++++++++++++++ .../kyuubi/operation/JDBCTestUtils.scala | 7 +++---- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala index 0dbc950af..9dc75a6dd 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala @@ -50,10 +50,22 @@ class Shim_v3_0 extends Shim_v2_4 { (catalog.defaultNamespace() ++ catalog.listNamespaces(Array()).map(_.head)).distinct schemas.filter(_.matches(schemaPattern)) case catalog: SupportsNamespaces => - // TODO: 1. We need explode here based on the impl of DSv2 - // TODO: 2. we need ensure how BI tools support multipart namespaces - val schemas = (catalog.defaultNamespace() ++ catalog.listNamespaces().map(_.head)).distinct - schemas.filter(_.matches(schemaPattern)) + val rootSchema = catalog.listNamespaces() + val allSchemas = listNamespaces(catalog, rootSchema).map(_.mkString(".")) + val schemas = (allSchemas ++: catalog.defaultNamespace().toSet) + schemas.filter(_.matches(schemaPattern)).toSeq + } + + private def listNamespaces( + catalog: SupportsNamespaces, namespaces: Array[Array[String]]): Array[Array[String]] = { + val children = namespaces.flatMap { ns => + catalog.listNamespaces(ns) + } + if (children.isEmpty) { + namespaces + } else { + namespaces ++: listNamespaces(catalog, children) + } } override def getSchemas( diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/BasicIcebergJDBCTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/BasicIcebergJDBCTests.scala index d525c5406..87967a57c 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/BasicIcebergJDBCTests.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/BasicIcebergJDBCTests.scala @@ -89,4 +89,24 @@ trait BasicIcebergJDBCTests extends JDBCTestUtils { checkGetSchemas(metaData.getSchemas(catalog, "db_not_exist"), Seq.empty, catalog) } } + + test("get schemas with multipart namespaces") { + val dbs = Seq("db1", "db1.db2", "db1.db2.db3", "db4") + + withDatabases(dbs: _*) { statement => + dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db")) + val metaData = statement.getConnection.getMetaData + + val allPattern = Seq("", "*", "%", null, ".*", "_*", "_%", ".%") + Seq(null, catalog).foreach { cg => + allPattern foreach { pattern => + checkGetSchemas( + metaData.getSchemas(cg, pattern), dbs ++ Seq("global_temp"), catalog) + } + } + + checkGetSchemas(metaData.getSchemas(catalog, "db1.db2%"), + Seq("db1.db2", "db1.db2.db3"), catalog) + } + } } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala index ef8fac66d..5658661eb 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala @@ -63,7 +63,7 @@ trait JDBCTestUtils extends KyuubiFunSuite { try { statements.zip(fs).foreach { case (s, f) => f(s) } } finally { - dbNames.foreach { name => + dbNames.reverse.foreach { name => statements.head.execute(s"DROP DATABASE IF EXISTS $name") } info("Closing statements") @@ -118,15 +118,14 @@ trait JDBCTestUtils extends KyuubiFunSuite { protected def checkGetSchemas( rs: ResultSet, dbNames: Seq[String], catalogName: String = ""): Unit = { - val expected = dbNames var count = 0 while(rs.next()) { count += 1 - assert(expected.contains(rs.getString("TABLE_SCHEM"))) + assert(dbNames.contains(rs.getString("TABLE_SCHEM"))) assert(rs.getString("TABLE_CATALOG") === catalogName) } // Make sure there are no more elements assert(!rs.next()) - assert(expected.size === count, "All expected schemas should be visited") + assert(dbNames.size === count, "All expected schemas should be visited") } }