[KYUUBI #309] GetTables supports DSv2 and keeps its backward compatibility
 [](https://github.com/yaooqinn/kyuubi/pull/346)    [❨?❩](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT --> <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> fix #309 get tables through Spark DSv2 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request Closes #346 from yaooqinn/gettables. 16dbe4b [Kent Yao] nit e03b021 [Kent Yao] nit 7f3eee2 [Kent Yao] Merge branch 'master' into gettables 0180cc5 [Kent Yao] add a test 8498152 [Kent Yao] GetTables supports DSv2 and keeps its backward compatibility Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
c1a71b7d00
commit
a2818003e6
@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.operation
|
||||
import org.apache.spark.sql.{Row, SparkSession}
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
import org.apache.kyuubi.engine.spark.shim.SparkShim
|
||||
import org.apache.kyuubi.operation.OperationType
|
||||
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
|
||||
import org.apache.kyuubi.session.Session
|
||||
@ -32,6 +33,6 @@ class GetTableTypes(spark: SparkSession, session: Session)
|
||||
}
|
||||
|
||||
override protected def runInternal(): Unit = {
|
||||
iter = Seq("EXTERNAL", "MANAGED", "VIEW").map(Row(_)).toList.iterator
|
||||
iter = SparkShim.sparkTableTypes.map(Row(_)).toList.iterator
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,15 +17,11 @@
|
||||
|
||||
package org.apache.kyuubi.engine.spark.operation
|
||||
|
||||
import java.util.regex.Pattern
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.commons.lang3.StringUtils
|
||||
import org.apache.spark.sql.{Row, SparkSession}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
import org.apache.kyuubi.engine.spark.shim.SparkShim
|
||||
import org.apache.kyuubi.operation.OperationType
|
||||
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
|
||||
import org.apache.kyuubi.session.Session
|
||||
@ -36,7 +32,7 @@ class GetTables(
|
||||
catalog: String,
|
||||
schema: String,
|
||||
tableName: String,
|
||||
tableTypes: java.util.List[String])
|
||||
tableTypes: Set[String])
|
||||
extends SparkOperation(spark, OperationType.GET_TABLES, session) {
|
||||
|
||||
override def statement: String = {
|
||||
@ -44,15 +40,7 @@ class GetTables(
|
||||
s" [catalog: $catalog," +
|
||||
s" schemaPattern: $schema," +
|
||||
s" tablePattern: $tableName," +
|
||||
s" tableTypes: ${Option(tableTypes).map(_.asScala.mkString("(", ", ", ")")).orNull}]"
|
||||
}
|
||||
|
||||
private def matched(tableType: CatalogTableType): Boolean = {
|
||||
val commonTableType = tableType match {
|
||||
case CatalogTableType.VIEW => "VIEW"
|
||||
case _ => "TABLE"
|
||||
}
|
||||
tableTypes == null || tableTypes.isEmpty || tableTypes.contains(commonTableType)
|
||||
s" tableTypes: ${tableTypes.mkString("(", ", ", ")")}]"
|
||||
}
|
||||
|
||||
override protected def resultSchema: StructType = {
|
||||
@ -73,48 +61,22 @@ class GetTables(
|
||||
|
||||
override protected def runInternal(): Unit = {
|
||||
try {
|
||||
val catalog = spark.sessionState.catalog
|
||||
val schemaPattern = convertSchemaPattern(schema)
|
||||
val schemaPattern = convertSchemaPattern(schema, datanucleusFormat = false)
|
||||
val tablePattern = convertIdentifierPattern(tableName, datanucleusFormat = true)
|
||||
val databases = catalog.listDatabases(schemaPattern)
|
||||
val sparkShim = SparkShim()
|
||||
val catalogTablesAndViews =
|
||||
sparkShim.getCatalogTablesOrViews(spark, catalog, schemaPattern, tablePattern, tableTypes)
|
||||
|
||||
val tables = databases.flatMap { db =>
|
||||
val identifiers = catalog.listTables(db, tablePattern, includeLocalTempViews = false)
|
||||
catalog.getTablesByName(identifiers).filter(t => matched(t.tableType)).map { t =>
|
||||
Row(
|
||||
"",
|
||||
t.database,
|
||||
t.identifier.table,
|
||||
t.tableType.name,
|
||||
t.comment.getOrElse(""),
|
||||
null, null, null, null, null)
|
||||
}
|
||||
}
|
||||
|
||||
val views = if (matched(CatalogTableType.VIEW)) {
|
||||
val globalTempViewDb = catalog.globalTempViewManager.database
|
||||
(if (StringUtils.isEmpty(schema) || schema == "*"
|
||||
|| Pattern.compile(convertSchemaPattern(schema, datanucleusFormat = false))
|
||||
.matcher(globalTempViewDb).matches()) {
|
||||
catalog.listTables(globalTempViewDb, tablePattern, includeLocalTempViews = true)
|
||||
val allTableAndViews =
|
||||
if (tableTypes.exists("VIEW".equalsIgnoreCase)) {
|
||||
catalogTablesAndViews ++
|
||||
sparkShim.getTempViews(spark, catalog, schemaPattern, tablePattern)
|
||||
} else {
|
||||
catalog.listLocalTempViews(tablePattern)
|
||||
}).map { v =>
|
||||
Row(
|
||||
"",
|
||||
v.database.orNull,
|
||||
v.table,
|
||||
CatalogTableType.VIEW.name,
|
||||
"",
|
||||
null, null, null, null, null)
|
||||
catalogTablesAndViews
|
||||
}
|
||||
} else {
|
||||
Seq.empty[Row]
|
||||
}
|
||||
iter = (tables ++ views).toList.iterator
|
||||
iter = allTableAndViews.toList.iterator
|
||||
} catch {
|
||||
onError()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -19,9 +19,12 @@ package org.apache.kyuubi.engine.spark.operation
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
import org.apache.kyuubi.KyuubiSQLException
|
||||
import org.apache.kyuubi.engine.spark.shim.SparkShim
|
||||
import org.apache.kyuubi.operation.{Operation, OperationManager}
|
||||
import org.apache.kyuubi.session.{Session, SessionHandle}
|
||||
|
||||
@ -87,7 +90,12 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
|
||||
tableName: String,
|
||||
tableTypes: java.util.List[String]): Operation = {
|
||||
val spark = getSparkSession(session.handle)
|
||||
val op = new GetTables(spark, session, catalogName, schemaName, tableName, tableTypes)
|
||||
val tTypes = if (tableTypes == null || tableTypes.isEmpty) {
|
||||
SparkShim.sparkTableTypes
|
||||
} else {
|
||||
tableTypes.asScala.toSet
|
||||
}
|
||||
val op = new GetTables(spark, session, catalogName, schemaName, tableName, tTypes)
|
||||
addOperation(op)
|
||||
}
|
||||
|
||||
|
||||
@ -18,14 +18,16 @@
|
||||
package org.apache.kyuubi.engine.spark.shim
|
||||
|
||||
import org.apache.spark.sql.{Row, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.connector.catalog.CatalogPlugin
|
||||
|
||||
class Shim_v2_4 extends SparkShim {
|
||||
|
||||
override def getCatalogs(spark: SparkSession): Seq[Row] = {
|
||||
Seq(Row(""))
|
||||
}
|
||||
override def getCatalogs(spark: SparkSession): Seq[Row] = Seq(Row(""))
|
||||
|
||||
override def catalogExists(spark: SparkSession, catalog: String): Boolean = false
|
||||
override protected def getCatalog(spark: SparkSession, catalog: String): CatalogPlugin = null
|
||||
|
||||
override protected def catalogExists(spark: SparkSession, catalog: String): Boolean = false
|
||||
|
||||
override def getSchemas(
|
||||
spark: SparkSession,
|
||||
@ -34,4 +36,54 @@ class Shim_v2_4 extends SparkShim {
|
||||
(spark.sessionState.catalog.listDatabases(schemaPattern) ++
|
||||
getGlobalTempViewManager(spark, schemaPattern)).map(Row(_, ""))
|
||||
}
|
||||
|
||||
override protected def getGlobalTempViewManager(
|
||||
spark: SparkSession, schemaPattern: String): Seq[String] = {
|
||||
val database = spark.sharedState.globalTempViewManager.database
|
||||
Option(database).filter(_.matches(schemaPattern)).toSeq
|
||||
}
|
||||
|
||||
override def getCatalogTablesOrViews(
|
||||
spark: SparkSession,
|
||||
catalogName: String,
|
||||
schemaPattern: String,
|
||||
tablePattern: String,
|
||||
tableTypes: Set[String]): Seq[Row] = {
|
||||
val catalog = spark.sessionState.catalog
|
||||
val databases = catalog.listDatabases(schemaPattern)
|
||||
|
||||
databases.flatMap { db =>
|
||||
val identifiers = catalog.listTables(db, tablePattern, includeLocalTempViews = false)
|
||||
catalog.getTablesByName(identifiers)
|
||||
.filter(t => matched(tableTypes, t.tableType.name)).map { t =>
|
||||
val typ = if (t.tableType.name == "VIEW") "VIEW" else "TABLE"
|
||||
Row(catalogName, t.database, t.identifier.table, typ, t.comment.getOrElse(""),
|
||||
null, null, null, null, null)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def getTempViews(
|
||||
spark: SparkSession,
|
||||
catalogName: String,
|
||||
schemaPattern: String,
|
||||
tablePattern: String): Seq[Row] = {
|
||||
val views = getViews(spark, schemaPattern, tablePattern)
|
||||
views.map { ident =>
|
||||
Row(catalogName, ident.database.orNull, ident.table, "VIEW", "",
|
||||
null, null, null, null, null)
|
||||
}
|
||||
}
|
||||
|
||||
override protected def getViews(
|
||||
spark: SparkSession,
|
||||
schemaPattern: String,
|
||||
tablePattern: String): Seq[TableIdentifier] = {
|
||||
val db = getGlobalTempViewManager(spark, schemaPattern)
|
||||
if (db.nonEmpty) {
|
||||
spark.sessionState.catalog.listTables(db.head, tablePattern)
|
||||
} else {
|
||||
spark.sessionState.catalog.listLocalTempViews(tablePattern)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,7 +18,7 @@
|
||||
package org.apache.kyuubi.engine.spark.shim
|
||||
|
||||
import org.apache.spark.sql.{Row, SparkSession}
|
||||
import org.apache.spark.sql.connector.catalog.{CatalogExtension, CatalogPlugin, SupportsNamespaces}
|
||||
import org.apache.spark.sql.connector.catalog.{CatalogExtension, CatalogPlugin, SupportsNamespaces, TableCatalog}
|
||||
|
||||
class Shim_v3_0 extends Shim_v2_4 {
|
||||
|
||||
@ -37,34 +37,41 @@ class Shim_v3_0 extends Shim_v2_4 {
|
||||
(catalogs.keys ++: defaults).distinct.map(Row(_))
|
||||
}
|
||||
|
||||
override def getCatalog(spark: SparkSession, catalogName: String): CatalogPlugin = {
|
||||
val catalogManager = spark.sessionState.catalogManager
|
||||
if (catalogName == null) {
|
||||
catalogManager.currentCatalog
|
||||
} else {
|
||||
catalogManager.catalog(catalogName)
|
||||
}
|
||||
}
|
||||
|
||||
override def catalogExists(spark: SparkSession, catalog: String): Boolean = {
|
||||
spark.sessionState.catalogManager.isCatalogRegistered(catalog)
|
||||
}
|
||||
|
||||
private def getSchemas(
|
||||
catalog: CatalogPlugin,
|
||||
schemaPattern: String): Seq[String] = catalog match {
|
||||
case catalog: CatalogExtension =>
|
||||
// DSv2 does not support pass schemaPattern transparently
|
||||
val schemas =
|
||||
(catalog.defaultNamespace() ++ catalog.listNamespaces(Array()).map(_.head)).distinct
|
||||
schemas.filter(_.matches(schemaPattern))
|
||||
case catalog: SupportsNamespaces =>
|
||||
val rootSchema = catalog.listNamespaces()
|
||||
val allSchemas = listNamespaces(catalog, rootSchema).map(_.mkString("."))
|
||||
val schemas = (allSchemas ++: catalog.defaultNamespace().toSet)
|
||||
schemas.filter(_.matches(schemaPattern)).toSeq
|
||||
}
|
||||
|
||||
private def listNamespaces(
|
||||
catalog: SupportsNamespaces, namespaces: Array[Array[String]]): Array[Array[String]] = {
|
||||
private def listAllNamespaces(
|
||||
catalog: SupportsNamespaces,
|
||||
namespaces: Array[Array[String]]): Array[Array[String]] = {
|
||||
val children = namespaces.flatMap { ns =>
|
||||
catalog.listNamespaces(ns)
|
||||
}
|
||||
if (children.isEmpty) {
|
||||
namespaces.map(_.map(quoteIfNeeded))
|
||||
namespaces
|
||||
} else {
|
||||
namespaces.map(_.map(quoteIfNeeded)) ++: listNamespaces(catalog, children)
|
||||
namespaces ++: listAllNamespaces(catalog, children)
|
||||
}
|
||||
}
|
||||
|
||||
private def listAllNamespaces(catalog: CatalogPlugin): Array[Array[String]] = {
|
||||
catalog match {
|
||||
case catalog: CatalogExtension =>
|
||||
// DSv2 does not support pass schemaPattern transparently
|
||||
catalog.defaultNamespace() +: catalog.listNamespaces(Array())
|
||||
case catalog: SupportsNamespaces =>
|
||||
val rootSchema = catalog.listNamespaces()
|
||||
val allSchemas = listAllNamespaces(catalog, rootSchema)
|
||||
allSchemas
|
||||
}
|
||||
}
|
||||
|
||||
@ -79,18 +86,63 @@ class Shim_v3_0 extends Shim_v2_4 {
|
||||
}
|
||||
}
|
||||
|
||||
private def listNamespacesWithPattern(
|
||||
catalog: CatalogPlugin, schemaPattern: String): Array[Array[String]] = {
|
||||
val p = schemaPattern.r.pattern
|
||||
listAllNamespaces(catalog).filter { ns =>
|
||||
val quoted = ns.map(quoteIfNeeded).mkString(".")
|
||||
p.matcher(quoted).matches()
|
||||
}.distinct
|
||||
}
|
||||
|
||||
private def getSchemasWithPattern(catalog: CatalogPlugin, schemaPattern: String): Seq[String] = {
|
||||
val p = schemaPattern.r.pattern
|
||||
listAllNamespaces(catalog).flatMap { ns =>
|
||||
val quoted = ns.map(quoteIfNeeded).mkString(".")
|
||||
if (p.matcher(quoted).matches()) {
|
||||
Some(quoted)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}.distinct
|
||||
}
|
||||
|
||||
override def getSchemas(
|
||||
spark: SparkSession,
|
||||
catalogName: String,
|
||||
schemaPattern: String): Seq[Row] = {
|
||||
val viewMgr = getGlobalTempViewManager(spark, schemaPattern)
|
||||
val manager = spark.sessionState.catalogManager
|
||||
if (catalogName == null) {
|
||||
val catalog = manager.currentCatalog
|
||||
(getSchemas(catalog, schemaPattern) ++ viewMgr).map(Row(_, catalog.name()))
|
||||
} else {
|
||||
val catalogPlugin = manager.catalog(catalogName)
|
||||
(getSchemas(catalogPlugin, schemaPattern) ++ viewMgr).map(Row(_, catalogName))
|
||||
val catalog = getCatalog(spark, catalogName)
|
||||
val schemas = getSchemasWithPattern(catalog, schemaPattern)
|
||||
(schemas ++ viewMgr).map(Row(_, catalog.name()))
|
||||
}
|
||||
|
||||
override def getCatalogTablesOrViews(
|
||||
spark: SparkSession,
|
||||
catalogName: String,
|
||||
schemaPattern: String,
|
||||
tablePattern: String,
|
||||
tableTypes: Set[String]): Seq[Row] = {
|
||||
val catalog = getCatalog(spark, catalogName)
|
||||
val schemas = listNamespacesWithPattern(catalog, schemaPattern)
|
||||
catalog match {
|
||||
case catalog if catalog.name() == SESSION_CATALOG =>
|
||||
super.getCatalogTablesOrViews(
|
||||
spark, SESSION_CATALOG, schemaPattern, tablePattern, tableTypes)
|
||||
case ce: CatalogExtension =>
|
||||
super.getCatalogTablesOrViews(spark, ce.name(), schemaPattern, tablePattern, tableTypes)
|
||||
case tc: TableCatalog =>
|
||||
schemas.flatMap { ns =>
|
||||
tc.listTables(ns)
|
||||
}.map { ident =>
|
||||
val table = tc.loadTable(ident)
|
||||
// TODO: restore view type for session catalog
|
||||
val comment = table.properties().getOrDefault(TableCatalog.PROP_COMMENT, "")
|
||||
val schema = ident.namespace().map(quoteIfNeeded).mkString(".")
|
||||
val tableName = quoteIfNeeded(ident.name())
|
||||
Row(catalog.name(), schema, tableName, "TABLE", comment, null, null, null, null, null)
|
||||
}
|
||||
case _ => Seq.empty[Row]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,6 +18,8 @@
|
||||
package org.apache.kyuubi.engine.spark.shim
|
||||
|
||||
import org.apache.spark.sql.{Row, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.connector.catalog.CatalogPlugin
|
||||
|
||||
import org.apache.kyuubi.{Logging, Utils}
|
||||
|
||||
@ -26,19 +28,55 @@ import org.apache.kyuubi.{Logging, Utils}
|
||||
*/
|
||||
trait SparkShim extends Logging {
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Catalog //
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Get all register catalogs in Spark's `CatalogManager`
|
||||
*/
|
||||
def getCatalogs(spark: SparkSession): Seq[Row]
|
||||
|
||||
def catalogExists(spark: SparkSession, catalog: String): Boolean
|
||||
protected def getCatalog(spark: SparkSession, catalog: String): CatalogPlugin
|
||||
|
||||
protected def catalogExists(spark: SparkSession, catalog: String): Boolean
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Schema //
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* a list of [[Row]]s, with 2 fields `schemaName: String, catalogName: String`
|
||||
*/
|
||||
def getSchemas(spark: SparkSession, catalogName: String, schemaPattern: String): Seq[Row]
|
||||
|
||||
def getGlobalTempViewManager(spark: SparkSession, schemaPattern: String): Seq[String] = {
|
||||
val database = spark.sharedState.globalTempViewManager.database
|
||||
Option(database).filter(_.matches(schemaPattern)).toSeq
|
||||
}
|
||||
protected def getGlobalTempViewManager(spark: SparkSession, schemaPattern: String): Seq[String]
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Table & View //
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
def getCatalogTablesOrViews(
|
||||
spark: SparkSession,
|
||||
catalogName: String,
|
||||
schemaPattern: String,
|
||||
tablePattern: String,
|
||||
tableTypes: Set[String]): Seq[Row]
|
||||
|
||||
def getTempViews(
|
||||
spark: SparkSession,
|
||||
catalogName: String,
|
||||
schemaPattern: String,
|
||||
tablePattern: String): Seq[Row]
|
||||
|
||||
protected def getViews(
|
||||
spark: SparkSession,
|
||||
schemaPattern: String,
|
||||
tablePattern: String): Seq[TableIdentifier]
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Miscellaneous //
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
protected def invoke(
|
||||
obj: Any,
|
||||
@ -66,6 +104,13 @@ trait SparkShim extends Logging {
|
||||
field.setAccessible(true)
|
||||
field.get(o)
|
||||
}
|
||||
|
||||
protected def matched(tableTypes: Set[String], tableType: String): Boolean = {
|
||||
val typ = if (tableType.equalsIgnoreCase("VIEW")) "VIEW" else "TABLE"
|
||||
tableTypes.exists(typ.equalsIgnoreCase)
|
||||
}
|
||||
|
||||
protected val SESSION_CATALOG: String = "spark_catalog"
|
||||
}
|
||||
|
||||
object SparkShim {
|
||||
@ -78,4 +123,6 @@ object SparkShim {
|
||||
case _ => throw new IllegalArgumentException(s"Not Support spark version $runtimeSparkVer")
|
||||
}
|
||||
}
|
||||
|
||||
val sparkTableTypes = Set("VIEW", "TABLE")
|
||||
}
|
||||
|
||||
@ -28,11 +28,11 @@ import org.apache.hive.service.rpc.thrift._
|
||||
import org.apache.hive.service.rpc.thrift.TCLIService.Iface
|
||||
import org.apache.hive.service.rpc.thrift.TOperationState._
|
||||
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
import org.apache.kyuubi.Utils
|
||||
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
|
||||
import org.apache.kyuubi.engine.spark.shim.SparkShim
|
||||
import org.apache.kyuubi.operation.JDBCTests
|
||||
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
|
||||
|
||||
@ -44,9 +44,9 @@ class SparkOperationSuite extends WithSparkSQLEngine with JDBCTests {
|
||||
withJdbcStatement() { statement =>
|
||||
val meta = statement.getConnection.getMetaData
|
||||
val types = meta.getTableTypes
|
||||
val expected = CatalogTableType.tableTypes.toIterator
|
||||
val expected = SparkShim.sparkTableTypes.toIterator
|
||||
while (types.next()) {
|
||||
assert(types.getString(TABLE_TYPE) === expected.next().name)
|
||||
assert(types.getString(TABLE_TYPE) === expected.next())
|
||||
}
|
||||
assert(!expected.hasNext)
|
||||
assert(!types.next())
|
||||
|
||||
@ -51,7 +51,7 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
|
||||
private var serverThread: Thread = _
|
||||
protected var serverAddr: InetAddress = _
|
||||
protected var portNum: Int = _
|
||||
protected var isStarted = false
|
||||
@volatile protected var isStarted = false
|
||||
|
||||
private var authFactory: KyuubiAuthenticationFactory = _
|
||||
private var hadoopConf: Configuration = _
|
||||
|
||||
@ -20,7 +20,7 @@ package org.apache.kyuubi.operation
|
||||
import java.nio.file.Path
|
||||
|
||||
import org.apache.kyuubi.Utils
|
||||
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
|
||||
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
|
||||
|
||||
trait BasicIcebergJDBCTests extends JDBCTestUtils {
|
||||
|
||||
@ -63,15 +63,14 @@ trait BasicIcebergJDBCTests extends JDBCTestUtils {
|
||||
dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db"))
|
||||
val metaData = statement.getConnection.getMetaData
|
||||
|
||||
val allPattern = Seq("", "*", "%", null, ".*", "_*", "_%", ".%")
|
||||
|
||||
// The session catalog
|
||||
allPattern foreach { pattern =>
|
||||
patterns foreach { pattern =>
|
||||
checkGetSchemas(metaData.getSchemas("spark_catalog", pattern), dbDflts, "spark_catalog")
|
||||
}
|
||||
|
||||
Seq(null, catalog).foreach { cg =>
|
||||
allPattern foreach { pattern =>
|
||||
patterns foreach { pattern =>
|
||||
checkGetSchemas(
|
||||
metaData.getSchemas(cg, pattern), dbs ++ Seq("global_temp"), catalog)
|
||||
}
|
||||
@ -104,9 +103,8 @@ trait BasicIcebergJDBCTests extends JDBCTestUtils {
|
||||
dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db"))
|
||||
val metaData = statement.getConnection.getMetaData
|
||||
|
||||
val allPattern = Seq("", "*", "%", null, ".*", "_*", "_%", ".%")
|
||||
Seq(null, catalog).foreach { cg =>
|
||||
allPattern foreach { pattern =>
|
||||
patterns foreach { pattern =>
|
||||
checkGetSchemas(metaData.getSchemas(cg, pattern),
|
||||
dbs ++ Seq("global_temp", "a", "db1", "db1.db2"), catalog)
|
||||
}
|
||||
@ -116,4 +114,42 @@ trait BasicIcebergJDBCTests extends JDBCTestUtils {
|
||||
Seq("db1.db2", "db1.db2.db3"), catalog)
|
||||
}
|
||||
}
|
||||
|
||||
test("get tables") {
|
||||
val dbs = Seq(
|
||||
"`a.b`",
|
||||
"`a.b`.c",
|
||||
"a.`b.c`",
|
||||
"`a.b.c`",
|
||||
"`a.b``.c`",
|
||||
"db1.db2.db3",
|
||||
"db4")
|
||||
withDatabases(dbs: _*) { statement =>
|
||||
dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db"))
|
||||
val metaData = statement.getConnection.getMetaData
|
||||
|
||||
Seq("hadoop_prod").foreach { catalog =>
|
||||
dbs.foreach { db =>
|
||||
try {
|
||||
statement.execute(
|
||||
s"CREATE TABLE IF NOT EXISTS $catalog.$db.tbl(c STRING) USING iceberg")
|
||||
|
||||
val rs1 = metaData.getTables(catalog, db, "%", null)
|
||||
while (rs1.next()) {
|
||||
val catalogName = rs1.getString(TABLE_CAT)
|
||||
assert(catalogName === catalog)
|
||||
assert(rs1.getString(TABLE_SCHEM) === db)
|
||||
assert(rs1.getString(TABLE_NAME) === "tbl")
|
||||
assert(rs1.getString(TABLE_TYPE) == "TABLE")
|
||||
assert(rs1.getString(REMARKS) === "")
|
||||
}
|
||||
assert(!rs1.next())
|
||||
} finally {
|
||||
statement.execute(s"DROP TABLE IF EXISTS $catalog.$db.tbl")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -64,7 +64,7 @@ trait BasicJDBCTests extends JDBCTestUtils {
|
||||
val view_global_test = "view_2_test"
|
||||
val tables = Seq(table_test, table_external_test, view_test, view_global_test)
|
||||
val schemas = Seq("default", "default", "default", "global_temp")
|
||||
val tableTypes = Seq("MANAGED", "EXTERNAL", "VIEW", "VIEW")
|
||||
val tableTypes = Seq("TABLE", "TABLE", "VIEW", "VIEW")
|
||||
withJdbcStatement(view_test, view_global_test, table_test, view_test) { statement =>
|
||||
statement.execute(
|
||||
s"CREATE TABLE IF NOT EXISTS $table_test(key int) USING parquet COMMENT '$table_test'")
|
||||
@ -80,7 +80,8 @@ trait BasicJDBCTests extends JDBCTestUtils {
|
||||
val rs1 = metaData.getTables(null, null, null, null)
|
||||
var i = 0
|
||||
while(rs1.next()) {
|
||||
assert(rs1.getString(TABLE_CAT).isEmpty)
|
||||
val catalogName = rs1.getString(TABLE_CAT)
|
||||
assert(catalogName === "spark_catalog" || catalogName === null)
|
||||
assert(rs1.getString(TABLE_SCHEM) === schemas(i))
|
||||
assert(rs1.getString(TABLE_NAME) == tables(i))
|
||||
assert(rs1.getString(TABLE_TYPE) == tableTypes(i))
|
||||
|
||||
@ -31,6 +31,7 @@ trait JDBCTestUtils extends KyuubiFunSuite {
|
||||
|
||||
protected val dftSchema = "default"
|
||||
protected val user: String = Utils.currentUser
|
||||
protected val patterns = Seq("", "*", "%", null, ".*", "_*", "_%", ".%")
|
||||
protected def jdbcUrl: String
|
||||
|
||||
protected def withMultipleConnectionJdbcStatement(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user