[KYUUBI #4925] Add default catalog using spark_catalog with the lineage result

### _Why are the changes needed?_

close #4925

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4932 from iodone/kyuubi-4868.

Closes #4925

ff3195772 [odone] remove the catalog with atlas supporting
fda2fe321 [odone] add default catalog to v1 table when parsing lineage

Authored-by: odone <odone.zhang@gmail.com>
Signed-off-by: ulyssesyou <ulyssesyou@apache.org>
This commit is contained in:
odone 2023-08-09 18:28:32 +08:00 committed by ulyssesyou
parent e644cd986c
commit bb671be2bd
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
6 changed files with 433 additions and 308 deletions

View File

@ -20,7 +20,7 @@ package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
import scala.collection.JavaConverters._
import org.apache.atlas.model.instance.{AtlasEntity, AtlasObjectId, AtlasRelatedObjectId}
import org.apache.spark.kyuubi.lineage.SparkContextHelper
import org.apache.spark.kyuubi.lineage.{LineageConf, SparkContextHelper}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.kyuubi.plugin.lineage.Lineage
@ -87,8 +87,9 @@ object AtlasEntityHelper {
if (inputs.nonEmpty && outputs.nonEmpty) {
val entity = new AtlasEntity(COLUMN_LINEAGE_TYPE)
val outputColumnName = buildColumnQualifiedName(columnLineage.column).get
val qualifiedName =
s"${processEntity.getAttribute("qualifiedName")}:${columnLineage.column}"
s"${processEntity.getAttribute("qualifiedName")}:${outputColumnName}"
entity.setAttribute("qualifiedName", qualifiedName)
entity.setAttribute("name", qualifiedName)
entity.setRelationshipAttribute("inputs", inputs.asJava)
@ -104,33 +105,33 @@ object AtlasEntityHelper {
}
def tableObjectId(tableName: String): Option[AtlasObjectId] = {
val dbTb = tableName.split('.')
if (dbTb.length == 2) {
val qualifiedName = tableQualifiedName(cluster, dbTb(0), dbTb(1))
// TODO parse datasource type
Some(new AtlasObjectId(HIVE_TABLE_TYPE, "qualifiedName", qualifiedName))
} else {
None
}
buildTableQualifiedName(tableName)
.map(new AtlasObjectId(HIVE_TABLE_TYPE, "qualifiedName", _))
}
def tableQualifiedName(cluster: String, db: String, table: String): String = {
s"${db.toLowerCase}.${table.toLowerCase}@$cluster"
def buildTableQualifiedName(tableName: String): Option[String] = {
val defaultCatalog = LineageConf.DEFAULT_CATALOG
tableName.split('.') match {
case Array(`defaultCatalog`, db, table) =>
Some(s"${db.toLowerCase}.${table.toLowerCase}@$cluster")
case _ =>
None
}
}
def columnObjectId(columnName: String): Option[AtlasObjectId] = {
val dbTbCol = columnName.split('.')
if (dbTbCol.length == 3) {
val qualifiedName = columnQualifiedName(cluster, dbTbCol(0), dbTbCol(1), dbTbCol(2))
// TODO parse datasource type
Some(new AtlasObjectId(HIVE_COLUMN_TYPE, "qualifiedName", qualifiedName))
} else {
None
}
buildColumnQualifiedName(columnName)
.map(new AtlasObjectId(HIVE_COLUMN_TYPE, "qualifiedName", _))
}
def columnQualifiedName(cluster: String, db: String, table: String, column: String): String = {
s"${db.toLowerCase}.${table.toLowerCase}.${column.toLowerCase}@$cluster"
def buildColumnQualifiedName(columnName: String): Option[String] = {
val defaultCatalog = LineageConf.DEFAULT_CATALOG
columnName.split('.') match {
case Array(`defaultCatalog`, db, table, column) =>
Some(s"${db.toLowerCase}.${table.toLowerCase}.${column.toLowerCase}@$cluster")
case _ =>
None
}
}
def objectId(entity: AtlasEntity): AtlasObjectId = {

View File

@ -199,7 +199,7 @@ trait LineageParser {
} else {
getQuery(plan)
}
val view = getField[TableIdentifier](plan, "name").unquotedString
val view = getV1TableName(getField[TableIdentifier](plan, "name").unquotedString)
extractColumnsLineage(query, parentColumnsLineage).map { case (k, v) =>
k.withName(s"$view.${k.name}") -> v
}
@ -207,7 +207,7 @@ trait LineageParser {
case p
if p.nodeName == "CreateViewCommand"
&& getField[ViewType](plan, "viewType") == PersistedView =>
val view = getField[TableIdentifier](plan, "name").unquotedString
val view = getV1TableName(getField[TableIdentifier](plan, "name").unquotedString)
val outputCols =
getField[Seq[(String, Option[String])]](plan, "userSpecifiedColumns").map(_._1)
val query =
@ -223,7 +223,7 @@ trait LineageParser {
}
case p if p.nodeName == "CreateDataSourceTableAsSelectCommand" =>
val table = getField[CatalogTable](plan, "table").qualifiedName
val table = getV1TableName(getField[CatalogTable](plan, "table").qualifiedName)
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
k.withName(s"$table.${k.name}") -> v
}
@ -231,7 +231,7 @@ trait LineageParser {
case p
if p.nodeName == "CreateHiveTableAsSelectCommand" ||
p.nodeName == "OptimizedCreateHiveTableAsSelectCommand" =>
val table = getField[CatalogTable](plan, "tableDesc").qualifiedName
val table = getV1TableName(getField[CatalogTable](plan, "tableDesc").qualifiedName)
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
k.withName(s"$table.${k.name}") -> v
}
@ -259,7 +259,8 @@ trait LineageParser {
case p if p.nodeName == "InsertIntoDataSourceCommand" =>
val logicalRelation = getField[LogicalRelation](plan, "logicalRelation")
val table = logicalRelation.catalogTable.map(_.qualifiedName).getOrElse("")
val table = logicalRelation
.catalogTable.map(t => getV1TableName(t.qualifiedName)).getOrElse("")
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
case (k, v) if table.nonEmpty =>
k.withName(s"$table.${k.name}") -> v
@ -267,7 +268,8 @@ trait LineageParser {
case p if p.nodeName == "InsertIntoHadoopFsRelationCommand" =>
val table =
getField[Option[CatalogTable]](plan, "catalogTable").map(_.qualifiedName)
getField[Option[CatalogTable]](plan, "catalogTable")
.map(t => getV1TableName(t.qualifiedName))
.getOrElse("")
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
case (k, v) if table.nonEmpty =>
@ -286,7 +288,7 @@ trait LineageParser {
}
case p if p.nodeName == "InsertIntoHiveTable" =>
val table = getField[CatalogTable](plan, "table").qualifiedName
val table = getV1TableName(getField[CatalogTable](plan, "table").qualifiedName)
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
k.withName(s"$table.${k.name}") -> v
}
@ -298,7 +300,7 @@ trait LineageParser {
if p.nodeName == "AppendData"
|| p.nodeName == "OverwriteByExpression"
|| p.nodeName == "OverwritePartitionsDynamic" =>
val table = getField[NamedRelation](plan, "table").name
val table = getV2TableName(getField[NamedRelation](plan, "table"))
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
k.withName(s"$table.${k.name}") -> v
}
@ -409,22 +411,22 @@ trait LineageParser {
joinColumnsLineage(parentColumnsLineage, childrenColumnsLineage)
case p: LogicalRelation if p.catalogTable.nonEmpty =>
val tableName = p.catalogTable.get.qualifiedName
val tableName = getV1TableName(p.catalogTable.get.qualifiedName)
joinRelationColumnLineage(parentColumnsLineage, p.output, Seq(tableName))
case p: HiveTableRelation =>
val tableName = p.tableMeta.qualifiedName
val tableName = getV1TableName(p.tableMeta.qualifiedName)
joinRelationColumnLineage(parentColumnsLineage, p.output, Seq(tableName))
case p: DataSourceV2ScanRelation =>
val tableName = p.name
val tableName = getV2TableName(p)
joinRelationColumnLineage(parentColumnsLineage, p.output, Seq(tableName))
// For creating the view from v2 table, the logical plan of table will
// be the `DataSourceV2Relation` not the `DataSourceV2ScanRelation`.
// because the view from the table is not going to read it.
case p: DataSourceV2Relation =>
val tableName = p.name
val tableName = getV2TableName(p)
joinRelationColumnLineage(parentColumnsLineage, p.output, Seq(tableName))
case p: LocalRelation =>
@ -445,7 +447,7 @@ trait LineageParser {
case p: View =>
if (!p.isTempView && SparkContextHelper.getConf(
LineageConf.SKIP_PARSING_PERMANENT_VIEW_ENABLED)) {
val viewName = p.desc.qualifiedName
val viewName = getV1TableName(p.desc.qualifiedName)
joinRelationColumnLineage(parentColumnsLineage, p.output, Seq(viewName))
} else {
val viewColumnsLineage =
@ -476,6 +478,27 @@ trait LineageParser {
}
private def getQuery(plan: LogicalPlan): LogicalPlan = getField[LogicalPlan](plan, "query")
private def getV2TableName(plan: NamedRelation): String = {
plan match {
case relation: DataSourceV2ScanRelation =>
val catalog = relation.relation.catalog.map(_.name()).getOrElse(LineageConf.DEFAULT_CATALOG)
val database = relation.relation.identifier.get.namespace().mkString(".")
val table = relation.relation.identifier.get.name()
s"$catalog.$database.$table"
case relation: DataSourceV2Relation =>
val catalog = relation.catalog.map(_.name()).getOrElse(LineageConf.DEFAULT_CATALOG)
val database = relation.identifier.get.namespace().mkString(".")
val table = relation.identifier.get.name()
s"$catalog.$database.$table"
case _ =>
plan.name
}
}
private def getV1TableName(qualifiedName: String): String = {
Seq(LineageConf.DEFAULT_CATALOG, qualifiedName).filter(_.nonEmpty).mkString(".")
}
}
case class SparkSQLLineageParseHelper(sparkSession: SparkSession) extends LineageParser

View File

@ -18,6 +18,7 @@
package org.apache.spark.kyuubi.lineage
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.sql.internal.SQLConf
import org.apache.kyuubi.plugin.lineage.LineageDispatcherType
@ -45,4 +46,6 @@ object LineageConf {
"Unsupported lineage dispatchers")
.createWithDefault(Seq(LineageDispatcherType.SPARK_EVENT.toString))
val DEFAULT_CATALOG: String = SQLConf.get.getConf(SQLConf.DEFAULT_CATALOG)
}

View File

@ -20,12 +20,11 @@ package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
import java.util
import scala.collection.JavaConverters._
import scala.collection.immutable.List
import org.apache.atlas.model.instance.{AtlasEntity, AtlasObjectId}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkConf
import org.apache.spark.kyuubi.lineage.LineageConf.{DISPATCHERS, SKIP_PARSING_PERMANENT_VIEW_ENABLED}
import org.apache.spark.kyuubi.lineage.LineageConf.{DEFAULT_CATALOG, DISPATCHERS, SKIP_PARSING_PERMANENT_VIEW_ENABLED}
import org.apache.spark.kyuubi.lineage.SparkContextHelper
import org.apache.spark.sql.SparkListenerExtensionTest
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@ -33,7 +32,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.plugin.lineage.Lineage
import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.{COLUMN_LINEAGE_TYPE, PROCESS_TYPE}
import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.{buildColumnQualifiedName, buildTableQualifiedName, COLUMN_LINEAGE_TYPE, PROCESS_TYPE}
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION
class AtlasLineageDispatcherSuite extends KyuubiFunSuite with SparkListenerExtensionTest {
@ -67,11 +66,17 @@ class AtlasLineageDispatcherSuite extends KyuubiFunSuite with SparkListenerExten
spark.sql("create table test_table1(a string, d int)")
spark.sql("insert into test_table1 select a, b + c as d from test_table0").collect()
val expected = Lineage(
List("default.test_table0"),
List("default.test_table1"),
List(s"$DEFAULT_CATALOG.default.test_table0"),
List(s"$DEFAULT_CATALOG.default.test_table1"),
List(
("default.test_table1.a", Set("default.test_table0.a")),
("default.test_table1.d", Set("default.test_table0.b", "default.test_table0.c"))))
(
s"$DEFAULT_CATALOG.default.test_table1.a",
Set(s"$DEFAULT_CATALOG.default.test_table0.a")),
(
s"$DEFAULT_CATALOG.default.test_table1.d",
Set(
s"$DEFAULT_CATALOG.default.test_table0.b",
s"$DEFAULT_CATALOG.default.test_table0.c"))))
eventually(Timeout(5.seconds)) {
assert(mockAtlasClient.getEntities != null && mockAtlasClient.getEntities.nonEmpty)
}
@ -100,8 +105,10 @@ class AtlasLineageDispatcherSuite extends KyuubiFunSuite with SparkListenerExten
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
val outputs = entity.getRelationshipAttribute("outputs")
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
assertResult(expected.inputTables.map(s => s"$s@$cluster"))(inputs)
assertResult(expected.outputTables.map(s => s"$s@$cluster"))(outputs)
assertResult(expected.inputTables
.flatMap(buildTableQualifiedName(_).toSeq))(inputs)
assertResult(expected.outputTables
.flatMap(buildTableQualifiedName(_).toSeq))(outputs)
}
def checkAtlasColumnLineageEntities(
@ -114,18 +121,20 @@ class AtlasLineageDispatcherSuite extends KyuubiFunSuite with SparkListenerExten
case (entity, expectedLineage) =>
assert(entity.getTypeName == COLUMN_LINEAGE_TYPE)
val expectedQualifiedName =
s"${processEntity.getAttribute("qualifiedName")}:${expectedLineage.column}"
s"${processEntity.getAttribute("qualifiedName")}:" +
s"${buildColumnQualifiedName(expectedLineage.column).get}"
assert(entity.getAttribute("qualifiedName") == expectedQualifiedName)
assert(entity.getAttribute("name") == expectedQualifiedName)
val inputs = entity.getRelationshipAttribute("inputs")
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
assertResult(expectedLineage.originalColumns.map(s => s"$s@$cluster"))(inputs.toSet)
assertResult(expectedLineage.originalColumns
.flatMap(buildColumnQualifiedName(_).toSet))(inputs.toSet)
val outputs = entity.getRelationshipAttribute("outputs")
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
assert(outputs.size == 1)
assert(s"${expectedLineage.column}@$cluster" == outputs.head)
assert(buildColumnQualifiedName(expectedLineage.column).toSeq.head == outputs.head)
assert(getQualifiedName(entity.getRelationshipAttribute("process").asInstanceOf[
AtlasObjectId]) == processEntity.getAttribute("qualifiedName"))

View File

@ -19,8 +19,6 @@ package org.apache.kyuubi.plugin.lineage.events
import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection.immutable.List
import org.apache.spark.SparkConf
import org.apache.spark.kyuubi.lineage.LineageConf._
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
@ -82,11 +80,11 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
spark.sql("create table test_table0(a string, b string)")
spark.sql("select a as col0, b as col1 from test_table0").collect()
val expected = Lineage(
List("default.test_table0"),
List(s"$DEFAULT_CATALOG.default.test_table0"),
List(),
List(
("col0", Set("default.test_table0.a")),
("col1", Set("default.test_table0.b"))))
("col0", Set(s"$DEFAULT_CATALOG.default.test_table0.a")),
("col1", Set(s"$DEFAULT_CATALOG.default.test_table0.b"))))
countDownLatch.await(20, TimeUnit.SECONDS)
assert(actualSparkEventLineage == expected)
assert(actualKyuubiEventLineage == expected)
@ -97,11 +95,11 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
val countDownLatch = new CountDownLatch(1)
var executionId: Long = -1
val expected = Lineage(
List("default.table1", "default.table0"),
List(s"$DEFAULT_CATALOG.default.table1", s"$DEFAULT_CATALOG.default.table0"),
List(),
List(
("aa", Set("default.table1.a")),
("bb", Set("default.table0.b"))))
("aa", Set(s"$DEFAULT_CATALOG.default.table1.a")),
("bb", Set(s"$DEFAULT_CATALOG.default.table0.b"))))
spark.sparkContext.addSparkListener(new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
@ -163,11 +161,11 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
s" where a in ('HELLO') and c = 'HELLO'").collect()
val expected = Lineage(
List("default.t2"),
List(s"$DEFAULT_CATALOG.default.t2"),
List(),
List(
("k", Set("default.t2.a")),
("b", Set("default.t2.b"))))
("k", Set(s"$DEFAULT_CATALOG.default.t2.a")),
("b", Set(s"$DEFAULT_CATALOG.default.t2.b"))))
countDownLatch.await(20, TimeUnit.SECONDS)
assert(actual == expected)
}