[KYUUBI #7168] Adapt PermanentViewMarker introduced by authz plugin in lineage plugin
### Why are the changes needed? Fix the lineage plugin cannot capture lineage of view after integrating authz plugin. closes #7168 ### How was this patch tested? added unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #7169 from wForget/KYUUBI-7168. Closes #7168 42ac01639 [wforget] fix test 208550a3e [wforget] [KYUUBI-7168] Adapt PermanentViewMarker introduced by authz plugin in lineage plugin Authored-by: wforget <643348094@qq.com> Signed-off-by: wforget <643348094@qq.com>
This commit is contained in:
parent
6eb24bc47a
commit
4c1412bdd0
@ -391,6 +391,12 @@
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.kyuubi</groupId>
|
||||
<artifactId>kyuubi-spark-lineage_${scala.binary.version}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@ -37,6 +37,8 @@ import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.funsuite.AnyFunSuite
|
||||
|
||||
import org.apache.kyuubi.Utils
|
||||
import org.apache.kyuubi.plugin.lineage.Lineage
|
||||
import org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper
|
||||
import org.apache.kyuubi.plugin.spark.authz.{AccessControlException, SparkSessionProvider}
|
||||
import org.apache.kyuubi.plugin.spark.authz.MysqlContainerEnv
|
||||
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
|
||||
@ -1513,4 +1515,31 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("Test view lineage") {
|
||||
def extractLineage(sql: String): Lineage = {
|
||||
val parsed = spark.sessionState.sqlParser.parsePlan(sql)
|
||||
val qe = spark.sessionState.executePlan(parsed)
|
||||
val analyzed = qe.analyzed
|
||||
SparkSQLLineageParseHelper(spark).transformToLineage(0, analyzed).get
|
||||
}
|
||||
|
||||
val db1 = defaultDb
|
||||
val table1 = "table1"
|
||||
val view1 = "view1"
|
||||
withSingleCallEnabled {
|
||||
withCleanTmpResources(Seq((s"$db1.$table1", "table"), (s"$db1.$view1", "view"))) {
|
||||
doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int, scope int)"))
|
||||
doAs(admin, sql(s"CREATE VIEW $db1.$view1 AS SELECT * FROM $db1.$table1"))
|
||||
|
||||
val lineage = doAs(
|
||||
admin,
|
||||
extractLineage(s"SELECT id FROM $db1.$view1 WHERE id > 1"))
|
||||
assert(lineage.inputTables.size == 1)
|
||||
assert(lineage.inputTables.head === s"spark_catalog.$db1.$table1")
|
||||
assert(lineage.columnLineage.size == 1)
|
||||
assert(lineage.columnLineage.head.originalColumns.head === s"spark_catalog.$db1.$table1.id")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -448,6 +448,13 @@ trait LineageParser {
|
||||
})
|
||||
}
|
||||
|
||||
// PermanentViewMarker is introduced by kyuubi authz plugin, which is a wrapper of View,
|
||||
// so we just extract the columns lineage from its inner children (original view)
|
||||
case pvm if pvm.nodeName == "PermanentViewMarker" =>
|
||||
pvm.innerChildren.asInstanceOf[Seq[LogicalPlan]]
|
||||
.map(extractColumnsLineage(_, parentColumnsLineage))
|
||||
.reduce(mergeColumnsLineage)
|
||||
|
||||
case p: View =>
|
||||
if (!p.isTempView && SparkContextHelper.getConf(
|
||||
LineageConf.SKIP_PARSING_PERMANENT_VIEW_ENABLED)) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user