[KYUUBI #4814] Introduce Apache Atlas hook support in lineage plugin

### _Why are the changes needed?_

Implements AtlasLineageDispatcher to send lineage to Apache Atlas.

close #4814

Atlas Spark Model Definition: https://github.com/apache/atlas/blob/master/addons/models/1000-Hadoop/1100-spark_model.json

spark process:

![1](https://github.com/apache/kyuubi/assets/17894939/28e2c68c-0ffd-4f1d-b805-a7e964f85aab)

table lineage:

![2](https://github.com/apache/kyuubi/assets/17894939/76b3db6d-ed50-42e3-97cf-2f96d4e403df)

column lineage:

![3](https://github.com/apache/kyuubi/assets/17894939/41ae6ef8-acbf-43b9-ad05-42d669c5e950)

### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [X] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4815 from wForget/KYUUBI-4814.

Closes #4814

3df8a7ec9 [wforget] comments
c58eae7c5 [wforget] comments
926bcf211 [wforget] comment
e0b4067c3 [wforget] comment
e4cc3e3f8 [wforget] comments
adc72b96f [Bowen Liang] Update extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
e3bdd1c65 [Bowen Liang] Update extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
baf1711ac [Bowen Liang] Update extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
61e79f3d5 [Bowen Liang] Update extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
541df3780 [Bowen Liang] Update extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
5dd310657 [wforget] fix
cea1e137d [wforget] fix
f028d4b09 [wforget] fix
0c9b4516b [wforget] fix
6f8113032 [wforget] add close atlas client shutdown hook
3f4d2a7db [wforget] add remote user
a0db58afc [wforget] comments
6dd3c66df [wforget] comments
f2b2a30dc [wforget] style
83eb1e481 [wforget] add atlas.column.lineage.enable configuration
0719a2b65 [wforget] doc
05f936005 [wforget] fix
d169b661d [wforget] fix
6da80d742 [wforget] fix
820ae5c5f [wforget] column lineages
dabe8173e [wforget] license
f22e044d2 [wforget] test
b948bce90 [wforget] fix and add test
0aef1be6b [wforget] fix
368b5ab3f [wforget] [KYUUBI-4814] Implements AtlasLineageDispatcher to send lineage to Apache Atlas

Lead-authored-by: wforget <643348094@qq.com>
Co-authored-by: Bowen Liang <bowenliang@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
wforget 2023-06-06 17:47:19 +08:00 committed by Cheng Pan
parent 4cd00a8777
commit 408862af72
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
14 changed files with 666 additions and 8 deletions

View File

@ -185,6 +185,7 @@ The lineage dispatchers are used to dispatch lineage events, configured via `spa
<ul>
<li>SPARK_EVENT (by default): send lineage event to spark event bus</li>
<li>KYUUBI_EVENT: send lineage event to kyuubi event bus</li>
<li>ATLAS: send lineage to apache atlas</li>
</ul>
#### Get Lineage Events from SparkListener
@ -207,3 +208,24 @@ spark.sparkContext.addSparkListener(new SparkListener {
#### Get Lineage Events from Kyuubi EventHandler
When using the `KYUUBI_EVENT` dispatcher, the lineage events will be sent to the Kyuubi `EventBus`. Refer to [Kyuubi Event Handler](../../server/events) to handle kyuubi events.
#### Ingest Lineage Entities to Apache Atlas
The lineage entities can be ingested into [Apache Atlas](https://atlas.apache.org/) using the `ATLAS` dispatcher.
Extra works:
+ The least transitive dependencies needed, which are under `./extensions/spark/kyuubi-spark-lineage/target/scala-${scala.binary.version}/jars`
+ Use `spark.files` to specify the `atlas-application.properties` configuration file for Atlas
Atlas Client configurations (Configure in `atlas-application.properties` or passed in `spark.atlas.` prefix):
| Name | Default Value | Description | Since |
|-----------------------------------------|------------------------|-------------------------------------------------------|-------|
| atlas.rest.address | http://localhost:21000 | The rest endpoint url for the Atlas server | 1.8.0 |
| atlas.client.type | rest | The client type (currently only supports rest) | 1.8.0 |
| atlas.client.username | none | The client username | 1.8.0 |
| atlas.client.password | none | The client password | 1.8.0 |
| atlas.cluster.name | primary | The cluster name to use in qualifiedName of entities. | 1.8.0 |
| atlas.hook.spark.column.lineage.enabled | true | Whether to ingest column lineages to Atlas. | 1.8.0 |

View File

@ -59,7 +59,85 @@
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<scope>test</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client-v2</artifactId>
<version>${atlas.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -89,12 +167,6 @@
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -20,6 +20,7 @@ package org.apache.kyuubi.plugin.lineage
import org.apache.spark.sql.execution.QueryExecution
import org.apache.kyuubi.plugin.lineage.dispatcher.{KyuubiEventDispatcher, SparkEventDispatcher}
import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasLineageDispatcher
trait LineageDispatcher {
@ -35,6 +36,7 @@ object LineageDispatcher {
LineageDispatcherType.withName(dispatcherType) match {
case LineageDispatcherType.SPARK_EVENT => new SparkEventDispatcher()
case LineageDispatcherType.KYUUBI_EVENT => new KyuubiEventDispatcher()
case LineageDispatcherType.ATLAS => new AtlasLineageDispatcher()
case _ => throw new UnsupportedOperationException(
s"Unsupported lineage dispatcher: $dispatcherType.")
}

View File

@ -20,5 +20,5 @@ package org.apache.kyuubi.plugin.lineage
object LineageDispatcherType extends Enumeration {
type LineageDispatcherType = Value
val SPARK_EVENT, KYUUBI_EVENT = Value
val SPARK_EVENT, KYUUBI_EVENT, ATLAS = Value
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
import java.util.Locale
import com.google.common.annotations.VisibleForTesting
import org.apache.atlas.AtlasClientV2
import org.apache.atlas.model.instance.AtlasEntity
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasClientConf._
trait AtlasClient extends AutoCloseable {
def send(entities: Seq[AtlasEntity]): Unit
}
class AtlasRestClient(conf: AtlasClientConf) extends AtlasClient {
private val atlasClient: AtlasClientV2 = {
val serverUrl = conf.get(ATLAS_REST_ENDPOINT).split(",")
val username = conf.get(CLIENT_USERNAME)
val password = conf.get(CLIENT_PASSWORD)
if (StringUtils.isNoneBlank(username, password)) {
new AtlasClientV2(serverUrl, Array(username, password))
} else {
new AtlasClientV2(serverUrl: _*)
}
}
override def send(entities: Seq[AtlasEntity]): Unit = {
val entitiesWithExtInfo = new AtlasEntitiesWithExtInfo()
entities.foreach(entitiesWithExtInfo.addEntity)
atlasClient.createEntities(entitiesWithExtInfo)
}
override def close(): Unit = {
if (atlasClient != null) {
atlasClient.close()
}
}
}
object AtlasClient {
@volatile private var client: AtlasClient = _
def getClient(): AtlasClient = {
if (client == null) {
AtlasClient.synchronized {
if (client == null) {
val clientConf = AtlasClientConf.getConf()
client = clientConf.get(CLIENT_TYPE).toLowerCase(Locale.ROOT) match {
case "rest" => new AtlasRestClient(clientConf)
case unknown => throw new RuntimeException(s"Unsupported client type: $unknown.")
}
registerCleanupShutdownHook(client)
}
}
}
client
}
private def registerCleanupShutdownHook(client: AtlasClient): Unit = {
ShutdownHookManager.get.addShutdownHook(
() => {
if (client != null) {
client.close()
}
},
Integer.MAX_VALUE)
}
@VisibleForTesting
private[dispatcher] def setClient(newClient: AtlasClient): Unit = {
client = newClient
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
import org.apache.atlas.ApplicationProperties
import org.apache.commons.configuration.Configuration
import org.apache.spark.kyuubi.lineage.SparkContextHelper
class AtlasClientConf(configuration: Configuration) {
def get(entry: ConfigEntry): String = {
configuration.getProperty(entry.key) match {
case s: String => s
case l: List[_] => l.mkString(",")
case o if o != null => o.toString
case _ => entry.defaultValue
}
}
}
object AtlasClientConf {
private lazy val clientConf: AtlasClientConf = {
val conf = ApplicationProperties.get()
SparkContextHelper.globalSparkContext.getConf.getAllWithPrefix("spark.atlas.")
.foreach { case (k, v) => conf.setProperty(s"atlas.$k", v) }
new AtlasClientConf(conf)
}
def getConf(): AtlasClientConf = clientConf
val ATLAS_REST_ENDPOINT = ConfigEntry("atlas.rest.address", "http://localhost:21000")
val CLIENT_TYPE = ConfigEntry("atlas.client.type", "rest")
val CLIENT_USERNAME = ConfigEntry("atlas.client.username", null)
val CLIENT_PASSWORD = ConfigEntry("atlas.client.password", null)
val CLUSTER_NAME = ConfigEntry("atlas.cluster.name", "primary")
val COLUMN_LINEAGE_ENABLED = ConfigEntry("atlas.hook.spark.column.lineage.enabled", "true")
}

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
import scala.collection.JavaConverters._
import org.apache.atlas.model.instance.{AtlasEntity, AtlasObjectId, AtlasRelatedObjectId}
import org.apache.spark.kyuubi.lineage.SparkContextHelper
import org.apache.spark.sql.execution.QueryExecution
import org.apache.kyuubi.plugin.lineage.Lineage
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper
/**
* The helpers for Atlas spark entities from Lineage.
* The Atlas spark models refer to :
* https://github.com/apache/atlas/blob/master/addons/models/1000-Hadoop/1100-spark_model.json
*/
object AtlasEntityHelper {
/**
* Generate `spark_process` Atlas Entity from Lineage.
* @param qe
* @param lineage
* @return
*/
def processEntity(qe: QueryExecution, lineage: Lineage): AtlasEntity = {
val entity = new AtlasEntity(PROCESS_TYPE)
val appId = SparkContextHelper.globalSparkContext.applicationId
val appName = SparkContextHelper.globalSparkContext.appName match {
case "Spark shell" => s"Spark Job $appId"
case default => s"$default $appId"
}
entity.setAttribute("qualifiedName", appId)
entity.setAttribute("name", appName)
entity.setAttribute("currUser", SparkListenerHelper.currentUser)
SparkListenerHelper.sessionUser.foreach(entity.setAttribute("remoteUser", _))
entity.setAttribute("executionId", qe.id)
entity.setAttribute("details", qe.toString())
entity.setAttribute("sparkPlanDescription", qe.sparkPlan.toString())
// TODO add entity type instead of parsing from string
val inputs = lineage.inputTables.flatMap(tableObjectId).map { objId =>
relatedObjectId(objId, RELATIONSHIP_DATASET_PROCESS_INPUTS)
}
val outputs = lineage.outputTables.flatMap(tableObjectId).map { objId =>
relatedObjectId(objId, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)
}
entity.setRelationshipAttribute("inputs", inputs.asJava)
entity.setRelationshipAttribute("outputs", outputs.asJava)
entity
}
/**
* Generate `spark_column_lineage` Atlas Entity from Lineage.
* @param processEntity
* @param lineage
* @return
*/
def columnLineageEntities(processEntity: AtlasEntity, lineage: Lineage): Seq[AtlasEntity] = {
lineage.columnLineage.flatMap(columnLineage => {
val inputs = columnLineage.originalColumns.flatMap(columnObjectId).map { objId =>
relatedObjectId(objId, RELATIONSHIP_DATASET_PROCESS_INPUTS)
}
val outputs = Option(columnLineage.column).flatMap(columnObjectId).map { objId =>
relatedObjectId(objId, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)
}.toSeq
if (inputs.nonEmpty && outputs.nonEmpty) {
val entity = new AtlasEntity(COLUMN_LINEAGE_TYPE)
val qualifiedName =
s"${processEntity.getAttribute("qualifiedName")}:${columnLineage.column}"
entity.setAttribute("qualifiedName", qualifiedName)
entity.setAttribute("name", qualifiedName)
entity.setRelationshipAttribute("inputs", inputs.asJava)
entity.setRelationshipAttribute("outputs", outputs.asJava)
entity.setRelationshipAttribute(
"process",
relatedObjectId(objectId(processEntity), RELATIONSHIP_SPARK_PROCESS_COLUMN_LINEAGE))
Some(entity)
} else {
None
}
})
}
def tableObjectId(tableName: String): Option[AtlasObjectId] = {
val dbTb = tableName.split('.')
if (dbTb.length == 2) {
val qualifiedName = tableQualifiedName(cluster, dbTb(0), dbTb(1))
// TODO parse datasource type
Some(new AtlasObjectId(HIVE_TABLE_TYPE, "qualifiedName", qualifiedName))
} else {
None
}
}
def tableQualifiedName(cluster: String, db: String, table: String): String = {
s"${db.toLowerCase}.${table.toLowerCase}@$cluster"
}
def columnObjectId(columnName: String): Option[AtlasObjectId] = {
val dbTbCol = columnName.split('.')
if (dbTbCol.length == 3) {
val qualifiedName = columnQualifiedName(cluster, dbTbCol(0), dbTbCol(1), dbTbCol(2))
// TODO parse datasource type
Some(new AtlasObjectId(HIVE_COLUMN_TYPE, "qualifiedName", qualifiedName))
} else {
None
}
}
def columnQualifiedName(cluster: String, db: String, table: String, column: String): String = {
s"${db.toLowerCase}.${table.toLowerCase}.${column.toLowerCase}@$cluster"
}
def objectId(entity: AtlasEntity): AtlasObjectId = {
val objId = new AtlasObjectId(entity.getGuid, entity.getTypeName)
objId.setUniqueAttributes(Map("qualifiedName" -> entity.getAttribute("qualifiedName")).asJava)
objId
}
def relatedObjectId(objectId: AtlasObjectId, relationshipType: String): AtlasRelatedObjectId = {
new AtlasRelatedObjectId(objectId, relationshipType)
}
lazy val cluster = AtlasClientConf.getConf().get(AtlasClientConf.CLUSTER_NAME)
lazy val columnLineageEnabled =
AtlasClientConf.getConf().get(AtlasClientConf.COLUMN_LINEAGE_ENABLED).toBoolean
val HIVE_TABLE_TYPE = "hive_table"
val HIVE_COLUMN_TYPE = "hive_column"
val PROCESS_TYPE = "spark_process"
val COLUMN_LINEAGE_TYPE = "spark_column_lineage"
val RELATIONSHIP_DATASET_PROCESS_INPUTS = "dataset_process_inputs"
val RELATIONSHIP_PROCESS_DATASET_OUTPUTS = "process_dataset_outputs"
val RELATIONSHIP_SPARK_PROCESS_COLUMN_LINEAGE = "spark_process_column_lineages"
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.QueryExecution
import org.apache.kyuubi.plugin.lineage.{Lineage, LineageDispatcher}
import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.columnLineageEnabled
class AtlasLineageDispatcher extends LineageDispatcher with Logging {
override def send(qe: QueryExecution, lineageOpt: Option[Lineage]): Unit = {
try {
lineageOpt.filter(l => l.inputTables.nonEmpty || l.outputTables.nonEmpty).foreach(lineage => {
val processEntity = AtlasEntityHelper.processEntity(qe, lineage)
val columnLineageEntities = if (lineage.columnLineage.nonEmpty && columnLineageEnabled) {
AtlasEntityHelper.columnLineageEntities(processEntity, lineage)
} else {
Seq.empty
}
AtlasClient.getClient().send(processEntity +: columnLineageEntities)
})
} catch {
case t: Throwable =>
logWarning("Send lineage to atlas failed.", t)
}
}
override def onFailure(qe: QueryExecution, exception: Exception): Unit = {
// ignore
}
}

View File

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
case class ConfigEntry(key: String, defaultValue: String)

View File

@ -17,7 +17,9 @@
package org.apache.kyuubi.plugin.lineage.helper
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.SPARK_VERSION
import org.apache.spark.kyuubi.lineage.SparkContextHelper
import org.apache.kyuubi.util.SemanticVersion
@ -40,4 +42,11 @@ object SparkListenerHelper {
def isSparkVersionEqualTo(targetVersionString: String): Boolean = {
SemanticVersion(SPARK_VERSION).isVersionEqualTo(targetVersionString)
}
def currentUser: String = UserGroupInformation.getCurrentUser.getShortUserName
def sessionUser: Option[String] =
Option(SparkContextHelper.globalSparkContext.getLocalProperty(KYUUBI_SESSION_USER))
final val KYUUBI_SESSION_USER = "kyuubi.session.user"
}

View File

@ -35,6 +35,7 @@ object LineageConf {
"`org.apache.kyuubi.plugin.lineage.LineageDispatcher` for dispatching lineage events.<ul>" +
"<li>SPARK_EVENT: send lineage event to spark event bus</li>" +
"<li>KYUUBI_EVENT: send lineage event to kyuubi event bus</li>" +
"<li>ATLAS: send lineage to apache atlas</li>" +
"</ul>")
.version("1.8.0")
.stringConf

View File

@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
atlas.cluster.name=test

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
import java.util
import scala.collection.JavaConverters._
import scala.collection.immutable.List
import org.apache.atlas.model.instance.{AtlasEntity, AtlasObjectId}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkConf
import org.apache.spark.kyuubi.lineage.LineageConf.{DISPATCHERS, SKIP_PARSING_PERMANENT_VIEW_ENABLED}
import org.apache.spark.kyuubi.lineage.SparkContextHelper
import org.apache.spark.sql.SparkListenerExtensionTest
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.plugin.lineage.Lineage
import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.{COLUMN_LINEAGE_TYPE, PROCESS_TYPE}
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
class AtlasLineageDispatcherSuite extends KyuubiFunSuite with SparkListenerExtensionTest {
val catalogName =
if (isSparkVersionAtMost("3.1")) "org.apache.spark.sql.connector.InMemoryTableCatalog"
else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
override protected val catalogImpl: String = "hive"
override def sparkConf(): SparkConf = {
super.sparkConf()
.set("spark.sql.catalog.v2_catalog", catalogName)
.set(
"spark.sql.queryExecutionListeners",
"org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener")
.set(DISPATCHERS.key, "ATLAS")
.set(SKIP_PARSING_PERMANENT_VIEW_ENABLED.key, "true")
}
override def afterAll(): Unit = {
spark.stop()
super.afterAll()
}
test("altas lineage capture: insert into select sql") {
val mockAtlasClient = new MockAtlasClient()
AtlasClient.setClient(mockAtlasClient)
withTable("test_table0") { _ =>
spark.sql("create table test_table0(a string, b int, c int)")
spark.sql("create table test_table1(a string, d int)")
spark.sql("insert into test_table1 select a, b + c as d from test_table0").collect()
val expected = Lineage(
List("default.test_table0"),
List("default.test_table1"),
List(
("default.test_table1.a", Set("default.test_table0.a")),
("default.test_table1.d", Set("default.test_table0.b", "default.test_table0.c"))))
eventually(Timeout(5.seconds)) {
assert(mockAtlasClient.getEntities != null && mockAtlasClient.getEntities.nonEmpty)
}
checkAtlasProcessEntity(mockAtlasClient.getEntities.head, expected)
checkAtlasColumnLineageEntities(
mockAtlasClient.getEntities.head,
mockAtlasClient.getEntities.tail,
expected)
}
}
def checkAtlasProcessEntity(entity: AtlasEntity, expected: Lineage): Unit = {
assert(entity.getTypeName == PROCESS_TYPE)
val appId = SparkContextHelper.globalSparkContext.applicationId
assert(entity.getAttribute("qualifiedName") == appId)
assert(entity.getAttribute("name")
== s"${SparkContextHelper.globalSparkContext.appName} $appId")
assert(StringUtils.isNotBlank(entity.getAttribute("currUser").asInstanceOf[String]))
assert(entity.getAttribute("executionId") != null)
assert(StringUtils.isNotBlank(entity.getAttribute("details").asInstanceOf[String]))
assert(StringUtils.isNotBlank(entity.getAttribute("sparkPlanDescription").asInstanceOf[String]))
val inputs = entity.getRelationshipAttribute("inputs")
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
val outputs = entity.getRelationshipAttribute("outputs")
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
assertResult(expected.inputTables.map(s => s"$s@$cluster"))(inputs)
assertResult(expected.outputTables.map(s => s"$s@$cluster"))(outputs)
}
def checkAtlasColumnLineageEntities(
processEntity: AtlasEntity,
entities: Seq[AtlasEntity],
expected: Lineage): Unit = {
assert(entities.size == expected.columnLineage.size)
entities.zip(expected.columnLineage).foreach {
case (entity, expectedLineage) =>
assert(entity.getTypeName == COLUMN_LINEAGE_TYPE)
val expectedQualifiedName =
s"${processEntity.getAttribute("qualifiedName")}:${expectedLineage.column}"
assert(entity.getAttribute("qualifiedName") == expectedQualifiedName)
assert(entity.getAttribute("name") == expectedQualifiedName)
val inputs = entity.getRelationshipAttribute("inputs")
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
assertResult(expectedLineage.originalColumns.map(s => s"$s@$cluster"))(inputs.toSet)
val outputs = entity.getRelationshipAttribute("outputs")
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
assert(outputs.size == 1)
assert(s"${expectedLineage.column}@$cluster" == outputs.head)
assert(getQualifiedName(entity.getRelationshipAttribute("process").asInstanceOf[
AtlasObjectId]) == processEntity.getAttribute("qualifiedName"))
}
}
// Pre-set cluster name for testing in `test/resources/atlas-application.properties`
private val cluster = "test"
def getQualifiedName(objId: AtlasObjectId): String = {
objId.getUniqueAttributes.get("qualifiedName").asInstanceOf[String]
}
class MockAtlasClient() extends AtlasClient {
private var _entities: Seq[AtlasEntity] = _
override def send(entities: Seq[AtlasEntity]): Unit = {
_entities = entities
}
def getEntities: Seq[AtlasEntity] = _entities
override def close(): Unit = {}
}
}

View File

@ -123,6 +123,7 @@
<antlr4.version>4.9.3</antlr4.version>
<antlr.st4.version>4.3.4</antlr.st4.version>
<apache.archive.dist>https://archive.apache.org/dist</apache.archive.dist>
<atlas.version>2.3.0</atlas.version>
<bouncycastle.version>1.67</bouncycastle.version>
<codahale.metrics.version>4.2.8</codahale.metrics.version>
<commons-cli.version>1.5.0</commons-cli.version>