[KYUUBI #4869] [AUTHZ] Introduce table extractor for ResolvedIdentifier in Spark 3.4

### _Why are the changes needed?_

- introduce ResolvedIdentifierTableExtractor for extracting table from `org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier` in Spark 3.4
- fixing ut failures w/ Spark 3.4
   -  ut CreateTable / CreateTableAsSelect / ReplaceTable / ReplaceTableAsSelect
   -  ut "Extracting table info with ResolvedDbObjectNameTableExtractor"

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4869 from bowenliang123/resolved.

Closes #4869

0bf65cd60 [liangbowen] introduce ResolvedIdentifierTableExtractor for spark 3.4

Authored-by: liangbowen <liangbowen@gf.com.cn>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
liangbowen 2023-05-27 22:30:19 +08:00 committed by Cheng Pan
parent 4a4f3cc278
commit ec7e7479fa
4 changed files with 60 additions and 2 deletions

View File

@ -21,5 +21,6 @@ org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.IdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.LogicalRelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedIdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedTableTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.TableIdentifierTableExtractor

View File

@ -108,6 +108,15 @@
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.CreateTable",
"tableDescs" : [ {
"fieldName" : "child",
"fieldExtractor" : "ResolvedIdentifierTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : null,
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
}, {
"fieldName" : "tableName",
"fieldExtractor" : "IdentifierTableExtractor",
"columnDesc" : null,
@ -134,6 +143,15 @@
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.CreateTableAsSelect",
"tableDescs" : [ {
"fieldName" : "left",
"fieldExtractor" : "ResolvedIdentifierTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : null,
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
}, {
"fieldName" : "tableName",
"fieldExtractor" : "IdentifierTableExtractor",
"columnDesc" : null,
@ -432,6 +450,15 @@
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.ReplaceTable",
"tableDescs" : [ {
"fieldName" : "child",
"fieldExtractor" : "ResolvedIdentifierTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : null,
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
}, {
"fieldName" : "tableName",
"fieldExtractor" : "IdentifierTableExtractor",
"columnDesc" : null,
@ -458,6 +485,15 @@
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.ReplaceTableAsSelect",
"tableDescs" : [ {
"fieldName" : "left",
"fieldExtractor" : "ResolvedIdentifierTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : null,
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
}, {
"fieldName" : "tableName",
"fieldExtractor" : "IdentifierTableExtractor",
"columnDesc" : null,

View File

@ -165,3 +165,16 @@ class ResolvedDbObjectNameTableExtractor extends TableExtractor {
Some(Table(catalog, Some(quote(namespace)), table, None))
}
}
/**
* org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
*/
class ResolvedIdentifierTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val catalogVal = invoke(v1, "catalog")
val catalog = new CatalogPluginCatalogExtractor().apply(catalogVal)
val identifier = invoke(v1, "identifier")
val maybeTable = new IdentifierTableExtractor().apply(spark, identifier)
maybeTable.map(_.copy(catalog = catalog))
}
}

View File

@ -30,6 +30,8 @@ object TableCommands {
val resolvedTableDesc = TableDesc("child", classOf[ResolvedTableTableExtractor])
val resolvedDbObjectNameDesc =
TableDesc("child", classOf[ResolvedDbObjectNameTableExtractor])
val resolvedIdentifierTableDesc =
TableDesc("child", classOf[ResolvedIdentifierTableExtractor])
val overwriteActionTypeDesc =
ActionTypeDesc("overwrite", classOf[OverwriteOrInsertActionTypeExtractor])
val queryQueryDesc = QueryDesc("query")
@ -205,7 +207,10 @@ object TableCommands {
"tableName",
classOf[IdentifierTableExtractor],
catalogDesc = Some(CatalogDesc()))
TableCommandSpec(cmd, Seq(tableDesc, resolvedDbObjectNameDesc), CREATETABLE)
TableCommandSpec(
cmd,
Seq(resolvedIdentifierTableDesc, tableDesc, resolvedDbObjectNameDesc),
CREATETABLE)
}
val CreateV2Table = {
@ -225,7 +230,10 @@ object TableCommands {
catalogDesc = Some(CatalogDesc()))
TableCommandSpec(
cmd,
Seq(tableDesc, resolvedDbObjectNameDesc.copy(fieldName = "left")),
Seq(
resolvedIdentifierTableDesc.copy(fieldName = "left"),
tableDesc,
resolvedDbObjectNameDesc.copy(fieldName = "left")),
CREATETABLE_AS_SELECT,
Seq(queryQueryDesc))
}