[KYUUBI #334] GetSchemas supports DSv2 multipart namespaces
 [](https://github.com/yaooqinn/kyuubi/pull/334)     [❨?❩](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT --> <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> This PR adds support for using Spark DSv2 to get Schemas with multipart namespaces under multiple catalogs. The current works are all based on Apache Iceberg. TODOS: next step we will support get tables operation for multiple catalogs and namespaces. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate  - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request Closes #334 from yaooqinn/getschema2. 5994170 [Kent Yao] update 188258a [Kent Yao] GetSchemas supports DSv2 multipart namespaces Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
c169c861ff
commit
c9d44b482c
@ -50,10 +50,22 @@ class Shim_v3_0 extends Shim_v2_4 {
|
||||
(catalog.defaultNamespace() ++ catalog.listNamespaces(Array()).map(_.head)).distinct
|
||||
schemas.filter(_.matches(schemaPattern))
|
||||
case catalog: SupportsNamespaces =>
|
||||
// TODO: 1. We need explode here based on the impl of DSv2
|
||||
// TODO: 2. we need ensure how BI tools support multipart namespaces
|
||||
val schemas = (catalog.defaultNamespace() ++ catalog.listNamespaces().map(_.head)).distinct
|
||||
schemas.filter(_.matches(schemaPattern))
|
||||
val rootSchema = catalog.listNamespaces()
|
||||
val allSchemas = listNamespaces(catalog, rootSchema).map(_.mkString("."))
|
||||
val schemas = (allSchemas ++: catalog.defaultNamespace().toSet)
|
||||
schemas.filter(_.matches(schemaPattern)).toSeq
|
||||
}
|
||||
|
||||
private def listNamespaces(
|
||||
catalog: SupportsNamespaces, namespaces: Array[Array[String]]): Array[Array[String]] = {
|
||||
val children = namespaces.flatMap { ns =>
|
||||
catalog.listNamespaces(ns)
|
||||
}
|
||||
if (children.isEmpty) {
|
||||
namespaces
|
||||
} else {
|
||||
namespaces ++: listNamespaces(catalog, children)
|
||||
}
|
||||
}
|
||||
|
||||
override def getSchemas(
|
||||
|
||||
@ -89,4 +89,24 @@ trait BasicIcebergJDBCTests extends JDBCTestUtils {
|
||||
checkGetSchemas(metaData.getSchemas(catalog, "db_not_exist"), Seq.empty, catalog)
|
||||
}
|
||||
}
|
||||
|
||||
test("get schemas with multipart namespaces") {
|
||||
val dbs = Seq("db1", "db1.db2", "db1.db2.db3", "db4")
|
||||
|
||||
withDatabases(dbs: _*) { statement =>
|
||||
dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db"))
|
||||
val metaData = statement.getConnection.getMetaData
|
||||
|
||||
val allPattern = Seq("", "*", "%", null, ".*", "_*", "_%", ".%")
|
||||
Seq(null, catalog).foreach { cg =>
|
||||
allPattern foreach { pattern =>
|
||||
checkGetSchemas(
|
||||
metaData.getSchemas(cg, pattern), dbs ++ Seq("global_temp"), catalog)
|
||||
}
|
||||
}
|
||||
|
||||
checkGetSchemas(metaData.getSchemas(catalog, "db1.db2%"),
|
||||
Seq("db1.db2", "db1.db2.db3"), catalog)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -63,7 +63,7 @@ trait JDBCTestUtils extends KyuubiFunSuite {
|
||||
try {
|
||||
statements.zip(fs).foreach { case (s, f) => f(s) }
|
||||
} finally {
|
||||
dbNames.foreach { name =>
|
||||
dbNames.reverse.foreach { name =>
|
||||
statements.head.execute(s"DROP DATABASE IF EXISTS $name")
|
||||
}
|
||||
info("Closing statements")
|
||||
@ -118,15 +118,14 @@ trait JDBCTestUtils extends KyuubiFunSuite {
|
||||
|
||||
protected def checkGetSchemas(
|
||||
rs: ResultSet, dbNames: Seq[String], catalogName: String = ""): Unit = {
|
||||
val expected = dbNames
|
||||
var count = 0
|
||||
while(rs.next()) {
|
||||
count += 1
|
||||
assert(expected.contains(rs.getString("TABLE_SCHEM")))
|
||||
assert(dbNames.contains(rs.getString("TABLE_SCHEM")))
|
||||
assert(rs.getString("TABLE_CATALOG") === catalogName)
|
||||
}
|
||||
// Make sure there are no more elements
|
||||
assert(!rs.next())
|
||||
assert(expected.size === count, "All expected schemas should be visited")
|
||||
assert(dbNames.size === count, "All expected schemas should be visited")
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user