[KYUUBI #5170] Identifier compatible with spark3.4
### _Why are the changes needed?_ close #5170 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No Closes #5171 from iodone/kyuubi-5170. Closes #5170 7cd0e2cbf [odone] identifier compatible with sprk3.4 Authored-by: odone <odone.zhang@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
8b7f3c9c70
commit
fdfb8f6d66
@ -45,14 +45,14 @@ The lineage of this SQL:
|
||||
|
||||
```json
|
||||
{
|
||||
"inputTables": ["default.test_table0"],
|
||||
"inputTables": ["spark_catalog.default.test_table0"],
|
||||
"outputTables": [],
|
||||
"columnLineage": [{
|
||||
"column": "col0",
|
||||
"originalColumns": ["default.test_table0.a"]
|
||||
"originalColumns": ["spark_catalog.default.test_table0.a"]
|
||||
}, {
|
||||
"column": "col1",
|
||||
"originalColumns": ["default.test_table0.b"]
|
||||
"originalColumns": ["spark_catalog.default.test_table0.b"]
|
||||
}]
|
||||
}
|
||||
```
|
||||
@ -125,6 +125,7 @@ The available `spark.version`s are shown in the following table.
|
||||
| Spark Version | Supported | Remark |
|
||||
|:-------------:|:---------:|:------:|
|
||||
| master | √ | - |
|
||||
| 3.4.x | √ | - |
|
||||
| 3.3.x | √ | - |
|
||||
| 3.2.x | √ | - |
|
||||
| 3.1.x | √ | - |
|
||||
|
||||
@ -53,7 +53,7 @@ trait LineageParser {
|
||||
val columnsLineage =
|
||||
extractColumnsLineage(plan, ListMap[Attribute, AttributeSet]()).toList.collect {
|
||||
case (k, attrs) =>
|
||||
k.name -> attrs.map(_.qualifiedName).toSet
|
||||
k.name -> attrs.map(attr => (attr.qualifier :+ attr.name).mkString(".")).toSet
|
||||
}
|
||||
val (inputTables, outputTables) = columnsLineage.foldLeft((List[String](), List[String]())) {
|
||||
case ((inputs, outputs), (out, in)) =>
|
||||
@ -324,7 +324,8 @@ trait LineageParser {
|
||||
nextColumnsLlineage.map { case (k, _) => (k, AttributeSet(k)) })
|
||||
val sourceColumnsLineage = extractColumnsLineage(sourceTable, nextColumnsLlineage)
|
||||
val targetColumnsWithTargetTable = targetColumnsLineage.values.flatten.map { column =>
|
||||
column.withName(s"${column.qualifiedName}")
|
||||
val unquotedQualifiedName = (column.qualifier :+ column.name).mkString(".")
|
||||
column.withName(unquotedQualifiedName)
|
||||
}
|
||||
ListMap(targetColumnsWithTargetTable.zip(sourceColumnsLineage.values).toSeq: _*)
|
||||
|
||||
@ -497,7 +498,11 @@ trait LineageParser {
|
||||
}
|
||||
|
||||
private def getV1TableName(qualifiedName: String): String = {
|
||||
Seq(LineageConf.DEFAULT_CATALOG, qualifiedName).filter(_.nonEmpty).mkString(".")
|
||||
qualifiedName.split("\\.") match {
|
||||
case Array(database, table) =>
|
||||
Seq(LineageConf.DEFAULT_CATALOG, database, table).filter(_.nonEmpty).mkString(".")
|
||||
case _ => qualifiedName
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user