diff --git a/docs/extensions/engines/spark/lineage.md b/docs/extensions/engines/spark/lineage.md index 01acd884d..2dbb2a026 100644 --- a/docs/extensions/engines/spark/lineage.md +++ b/docs/extensions/engines/spark/lineage.md @@ -45,14 +45,14 @@ The lineage of this SQL: ```json { - "inputTables": ["default.test_table0"], + "inputTables": ["spark_catalog.default.test_table0"], "outputTables": [], "columnLineage": [{ "column": "col0", - "originalColumns": ["default.test_table0.a"] + "originalColumns": ["spark_catalog.default.test_table0.a"] }, { "column": "col1", - "originalColumns": ["default.test_table0.b"] + "originalColumns": ["spark_catalog.default.test_table0.b"] }] } ``` @@ -125,6 +125,7 @@ The available `spark.version`s are shown in the following table. | Spark Version | Supported | Remark | |:-------------:|:---------:|:------:| | master | √ | - | +| 3.4.x | √ | - | | 3.3.x | √ | - | | 3.2.x | √ | - | | 3.1.x | √ | - | diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala index dad37eac5..ab669aa19 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala @@ -53,7 +53,7 @@ trait LineageParser { val columnsLineage = extractColumnsLineage(plan, ListMap[Attribute, AttributeSet]()).toList.collect { case (k, attrs) => - k.name -> attrs.map(_.qualifiedName).toSet + k.name -> attrs.map(attr => (attr.qualifier :+ attr.name).mkString(".")).toSet } val (inputTables, outputTables) = columnsLineage.foldLeft((List[String](), List[String]())) { case ((inputs, outputs), (out, in)) => @@ -324,7 +324,8 @@ trait LineageParser { nextColumnsLlineage.map { case (k, _) => (k, AttributeSet(k)) }) val sourceColumnsLineage = extractColumnsLineage(sourceTable, nextColumnsLlineage) val targetColumnsWithTargetTable = targetColumnsLineage.values.flatten.map { column => - column.withName(s"${column.qualifiedName}") + val unquotedQualifiedName = (column.qualifier :+ column.name).mkString(".") + column.withName(unquotedQualifiedName) } ListMap(targetColumnsWithTargetTable.zip(sourceColumnsLineage.values).toSeq: _*) @@ -497,7 +498,11 @@ trait LineageParser { } private def getV1TableName(qualifiedName: String): String = { - Seq(LineageConf.DEFAULT_CATALOG, qualifiedName).filter(_.nonEmpty).mkString(".") + qualifiedName.split("\\.") match { + case Array(database, table) => + Seq(LineageConf.DEFAULT_CATALOG, database, table).filter(_.nonEmpty).mkString(".") + case _ => qualifiedName + } } }