diff --git a/docs/extensions/engines/spark/lineage.md b/docs/extensions/engines/spark/lineage.md index 665929e9f..01acd884d 100644 --- a/docs/extensions/engines/spark/lineage.md +++ b/docs/extensions/engines/spark/lineage.md @@ -185,6 +185,7 @@ The lineage dispatchers are used to dispatch lineage events, configured via `spa #### Get Lineage Events from SparkListener @@ -207,3 +208,24 @@ spark.sparkContext.addSparkListener(new SparkListener { #### Get Lineage Events from Kyuubi EventHandler When using the `KYUUBI_EVENT` dispatcher, the lineage events will be sent to the Kyuubi `EventBus`. Refer to [Kyuubi Event Handler](../../server/events) to handle kyuubi events. + +#### Ingest Lineage Entities to Apache Atlas + +The lineage entities can be ingested into [Apache Atlas](https://atlas.apache.org/) using the `ATLAS` dispatcher. + +Extra works: + ++ The least transitive dependencies needed, which are under `./extensions/spark/kyuubi-spark-lineage/target/scala-${scala.binary.version}/jars` ++ Use `spark.files` to specify the `atlas-application.properties` configuration file for Atlas + +Atlas Client configurations (Configure in `atlas-application.properties` or passed in `spark.atlas.` prefix): + +| Name | Default Value | Description | Since | +|-----------------------------------------|------------------------|-------------------------------------------------------|-------| +| atlas.rest.address | http://localhost:21000 | The rest endpoint url for the Atlas server | 1.8.0 | +| atlas.client.type | rest | The client type (currently only supports rest) | 1.8.0 | +| atlas.client.username | none | The client username | 1.8.0 | +| atlas.client.password | none | The client password | 1.8.0 | +| atlas.cluster.name | primary | The cluster name to use in qualifiedName of entities. | 1.8.0 | +| atlas.hook.spark.column.lineage.enabled | true | Whether to ingest column lineages to Atlas. | 1.8.0 | + diff --git a/extensions/spark/kyuubi-spark-lineage/pom.xml b/extensions/spark/kyuubi-spark-lineage/pom.xml index 2a7ba7773..760b0cc08 100644 --- a/extensions/spark/kyuubi-spark-lineage/pom.xml +++ b/extensions/spark/kyuubi-spark-lineage/pom.xml @@ -59,7 +59,85 @@ commons-collections commons-collections - test + provided + + + + com.google.guava + guava + provided + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + com.fasterxml.jackson.core + jackson-core + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + + org.apache.httpcomponents + httpclient + provided + + + + commons-lang + commons-lang + provided + + + + org.apache.commons + commons-lang3 + provided + + + + org.apache.atlas + atlas-client-v2 + ${atlas.version} + + + org.slf4j + slf4j-log4j12 + + + org.slf4j + slf4j-api + + + org.slf4j + jul-to-slf4j + + + commons-logging + commons-logging + + + org.apache.hadoop + hadoop-common + + + org.springframework + spring-context + + + org.apache.commons + commons-text + + @@ -89,12 +167,6 @@ spark-hive_${scala.binary.version} test - - - com.google.guava - guava - test - diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala index 8f5dc0d9e..b993f1428 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.plugin.lineage import org.apache.spark.sql.execution.QueryExecution import org.apache.kyuubi.plugin.lineage.dispatcher.{KyuubiEventDispatcher, SparkEventDispatcher} +import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasLineageDispatcher trait LineageDispatcher { @@ -35,6 +36,7 @@ object LineageDispatcher { LineageDispatcherType.withName(dispatcherType) match { case LineageDispatcherType.SPARK_EVENT => new SparkEventDispatcher() case LineageDispatcherType.KYUUBI_EVENT => new KyuubiEventDispatcher() + case LineageDispatcherType.ATLAS => new AtlasLineageDispatcher() case _ => throw new UnsupportedOperationException( s"Unsupported lineage dispatcher: $dispatcherType.") } diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala index d6afea152..8e07f6d77 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala @@ -20,5 +20,5 @@ package org.apache.kyuubi.plugin.lineage object LineageDispatcherType extends Enumeration { type LineageDispatcherType = Value - val SPARK_EVENT, KYUUBI_EVENT = Value + val SPARK_EVENT, KYUUBI_EVENT, ATLAS = Value } diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClient.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClient.scala new file mode 100644 index 000000000..15b127182 --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClient.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage.dispatcher.atlas + +import java.util.Locale + +import com.google.common.annotations.VisibleForTesting +import org.apache.atlas.AtlasClientV2 +import org.apache.atlas.model.instance.AtlasEntity +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.util.ShutdownHookManager + +import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasClientConf._ + +trait AtlasClient extends AutoCloseable { + def send(entities: Seq[AtlasEntity]): Unit +} + +class AtlasRestClient(conf: AtlasClientConf) extends AtlasClient { + + private val atlasClient: AtlasClientV2 = { + val serverUrl = conf.get(ATLAS_REST_ENDPOINT).split(",") + val username = conf.get(CLIENT_USERNAME) + val password = conf.get(CLIENT_PASSWORD) + if (StringUtils.isNoneBlank(username, password)) { + new AtlasClientV2(serverUrl, Array(username, password)) + } else { + new AtlasClientV2(serverUrl: _*) + } + } + + override def send(entities: Seq[AtlasEntity]): Unit = { + val entitiesWithExtInfo = new AtlasEntitiesWithExtInfo() + entities.foreach(entitiesWithExtInfo.addEntity) + atlasClient.createEntities(entitiesWithExtInfo) + } + + override def close(): Unit = { + if (atlasClient != null) { + atlasClient.close() + } + } +} + +object AtlasClient { + + @volatile private var client: AtlasClient = _ + + def getClient(): AtlasClient = { + if (client == null) { + AtlasClient.synchronized { + if (client == null) { + val clientConf = AtlasClientConf.getConf() + client = clientConf.get(CLIENT_TYPE).toLowerCase(Locale.ROOT) match { + case "rest" => new AtlasRestClient(clientConf) + case unknown => throw new RuntimeException(s"Unsupported client type: $unknown.") + } + registerCleanupShutdownHook(client) + } + } + } + client + } + + private def registerCleanupShutdownHook(client: AtlasClient): Unit = { + ShutdownHookManager.get.addShutdownHook( + () => { + if (client != null) { + client.close() + } + }, + Integer.MAX_VALUE) + } + + @VisibleForTesting + private[dispatcher] def setClient(newClient: AtlasClient): Unit = { + client = newClient + } + +} diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClientConf.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClientConf.scala new file mode 100644 index 000000000..03b1a83e0 --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClientConf.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage.dispatcher.atlas + +import org.apache.atlas.ApplicationProperties +import org.apache.commons.configuration.Configuration +import org.apache.spark.kyuubi.lineage.SparkContextHelper + +class AtlasClientConf(configuration: Configuration) { + + def get(entry: ConfigEntry): String = { + configuration.getProperty(entry.key) match { + case s: String => s + case l: List[_] => l.mkString(",") + case o if o != null => o.toString + case _ => entry.defaultValue + } + } + +} + +object AtlasClientConf { + + private lazy val clientConf: AtlasClientConf = { + val conf = ApplicationProperties.get() + SparkContextHelper.globalSparkContext.getConf.getAllWithPrefix("spark.atlas.") + .foreach { case (k, v) => conf.setProperty(s"atlas.$k", v) } + new AtlasClientConf(conf) + } + + def getConf(): AtlasClientConf = clientConf + + val ATLAS_REST_ENDPOINT = ConfigEntry("atlas.rest.address", "http://localhost:21000") + + val CLIENT_TYPE = ConfigEntry("atlas.client.type", "rest") + val CLIENT_USERNAME = ConfigEntry("atlas.client.username", null) + val CLIENT_PASSWORD = ConfigEntry("atlas.client.password", null) + + val CLUSTER_NAME = ConfigEntry("atlas.cluster.name", "primary") + + val COLUMN_LINEAGE_ENABLED = ConfigEntry("atlas.hook.spark.column.lineage.enabled", "true") +} diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala new file mode 100644 index 000000000..9575b5258 --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage.dispatcher.atlas + +import scala.collection.JavaConverters._ + +import org.apache.atlas.model.instance.{AtlasEntity, AtlasObjectId, AtlasRelatedObjectId} +import org.apache.spark.kyuubi.lineage.SparkContextHelper +import org.apache.spark.sql.execution.QueryExecution + +import org.apache.kyuubi.plugin.lineage.Lineage +import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper + +/** + * The helpers for Atlas spark entities from Lineage. + * The Atlas spark models refer to : + * https://github.com/apache/atlas/blob/master/addons/models/1000-Hadoop/1100-spark_model.json + */ +object AtlasEntityHelper { + + /** + * Generate `spark_process` Atlas Entity from Lineage. + * @param qe + * @param lineage + * @return + */ + def processEntity(qe: QueryExecution, lineage: Lineage): AtlasEntity = { + val entity = new AtlasEntity(PROCESS_TYPE) + + val appId = SparkContextHelper.globalSparkContext.applicationId + val appName = SparkContextHelper.globalSparkContext.appName match { + case "Spark shell" => s"Spark Job $appId" + case default => s"$default $appId" + } + + entity.setAttribute("qualifiedName", appId) + entity.setAttribute("name", appName) + entity.setAttribute("currUser", SparkListenerHelper.currentUser) + SparkListenerHelper.sessionUser.foreach(entity.setAttribute("remoteUser", _)) + entity.setAttribute("executionId", qe.id) + entity.setAttribute("details", qe.toString()) + entity.setAttribute("sparkPlanDescription", qe.sparkPlan.toString()) + + // TODO add entity type instead of parsing from string + val inputs = lineage.inputTables.flatMap(tableObjectId).map { objId => + relatedObjectId(objId, RELATIONSHIP_DATASET_PROCESS_INPUTS) + } + val outputs = lineage.outputTables.flatMap(tableObjectId).map { objId => + relatedObjectId(objId, RELATIONSHIP_PROCESS_DATASET_OUTPUTS) + } + + entity.setRelationshipAttribute("inputs", inputs.asJava) + entity.setRelationshipAttribute("outputs", outputs.asJava) + + entity + } + + /** + * Generate `spark_column_lineage` Atlas Entity from Lineage. + * @param processEntity + * @param lineage + * @return + */ + def columnLineageEntities(processEntity: AtlasEntity, lineage: Lineage): Seq[AtlasEntity] = { + lineage.columnLineage.flatMap(columnLineage => { + val inputs = columnLineage.originalColumns.flatMap(columnObjectId).map { objId => + relatedObjectId(objId, RELATIONSHIP_DATASET_PROCESS_INPUTS) + } + val outputs = Option(columnLineage.column).flatMap(columnObjectId).map { objId => + relatedObjectId(objId, RELATIONSHIP_PROCESS_DATASET_OUTPUTS) + }.toSeq + + if (inputs.nonEmpty && outputs.nonEmpty) { + val entity = new AtlasEntity(COLUMN_LINEAGE_TYPE) + val qualifiedName = + s"${processEntity.getAttribute("qualifiedName")}:${columnLineage.column}" + entity.setAttribute("qualifiedName", qualifiedName) + entity.setAttribute("name", qualifiedName) + entity.setRelationshipAttribute("inputs", inputs.asJava) + entity.setRelationshipAttribute("outputs", outputs.asJava) + entity.setRelationshipAttribute( + "process", + relatedObjectId(objectId(processEntity), RELATIONSHIP_SPARK_PROCESS_COLUMN_LINEAGE)) + Some(entity) + } else { + None + } + }) + } + + def tableObjectId(tableName: String): Option[AtlasObjectId] = { + val dbTb = tableName.split('.') + if (dbTb.length == 2) { + val qualifiedName = tableQualifiedName(cluster, dbTb(0), dbTb(1)) + // TODO parse datasource type + Some(new AtlasObjectId(HIVE_TABLE_TYPE, "qualifiedName", qualifiedName)) + } else { + None + } + } + + def tableQualifiedName(cluster: String, db: String, table: String): String = { + s"${db.toLowerCase}.${table.toLowerCase}@$cluster" + } + + def columnObjectId(columnName: String): Option[AtlasObjectId] = { + val dbTbCol = columnName.split('.') + if (dbTbCol.length == 3) { + val qualifiedName = columnQualifiedName(cluster, dbTbCol(0), dbTbCol(1), dbTbCol(2)) + // TODO parse datasource type + Some(new AtlasObjectId(HIVE_COLUMN_TYPE, "qualifiedName", qualifiedName)) + } else { + None + } + } + + def columnQualifiedName(cluster: String, db: String, table: String, column: String): String = { + s"${db.toLowerCase}.${table.toLowerCase}.${column.toLowerCase}@$cluster" + } + + def objectId(entity: AtlasEntity): AtlasObjectId = { + val objId = new AtlasObjectId(entity.getGuid, entity.getTypeName) + objId.setUniqueAttributes(Map("qualifiedName" -> entity.getAttribute("qualifiedName")).asJava) + objId + } + + def relatedObjectId(objectId: AtlasObjectId, relationshipType: String): AtlasRelatedObjectId = { + new AtlasRelatedObjectId(objectId, relationshipType) + } + + lazy val cluster = AtlasClientConf.getConf().get(AtlasClientConf.CLUSTER_NAME) + lazy val columnLineageEnabled = + AtlasClientConf.getConf().get(AtlasClientConf.COLUMN_LINEAGE_ENABLED).toBoolean + + val HIVE_TABLE_TYPE = "hive_table" + val HIVE_COLUMN_TYPE = "hive_column" + val PROCESS_TYPE = "spark_process" + val COLUMN_LINEAGE_TYPE = "spark_column_lineage" + val RELATIONSHIP_DATASET_PROCESS_INPUTS = "dataset_process_inputs" + val RELATIONSHIP_PROCESS_DATASET_OUTPUTS = "process_dataset_outputs" + val RELATIONSHIP_SPARK_PROCESS_COLUMN_LINEAGE = "spark_process_column_lineages" + +} diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcher.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcher.scala new file mode 100644 index 000000000..c66b51107 --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcher.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage.dispatcher.atlas + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.QueryExecution + +import org.apache.kyuubi.plugin.lineage.{Lineage, LineageDispatcher} +import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.columnLineageEnabled + +class AtlasLineageDispatcher extends LineageDispatcher with Logging { + + override def send(qe: QueryExecution, lineageOpt: Option[Lineage]): Unit = { + try { + lineageOpt.filter(l => l.inputTables.nonEmpty || l.outputTables.nonEmpty).foreach(lineage => { + val processEntity = AtlasEntityHelper.processEntity(qe, lineage) + val columnLineageEntities = if (lineage.columnLineage.nonEmpty && columnLineageEnabled) { + AtlasEntityHelper.columnLineageEntities(processEntity, lineage) + } else { + Seq.empty + } + AtlasClient.getClient().send(processEntity +: columnLineageEntities) + }) + } catch { + case t: Throwable => + logWarning("Send lineage to atlas failed.", t) + } + } + + override def onFailure(qe: QueryExecution, exception: Exception): Unit = { + // ignore + } + +} diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/ConfigEntry.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/ConfigEntry.scala new file mode 100644 index 000000000..3f9d9831d --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/ConfigEntry.scala @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage.dispatcher.atlas + +case class ConfigEntry(key: String, defaultValue: String) diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala index 57db55a1e..a1747493e 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala @@ -17,7 +17,9 @@ package org.apache.kyuubi.plugin.lineage.helper +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.SPARK_VERSION +import org.apache.spark.kyuubi.lineage.SparkContextHelper import org.apache.kyuubi.util.SemanticVersion @@ -40,4 +42,11 @@ object SparkListenerHelper { def isSparkVersionEqualTo(targetVersionString: String): Boolean = { SemanticVersion(SPARK_VERSION).isVersionEqualTo(targetVersionString) } + + def currentUser: String = UserGroupInformation.getCurrentUser.getShortUserName + + def sessionUser: Option[String] = + Option(SparkContextHelper.globalSparkContext.getLocalProperty(KYUUBI_SESSION_USER)) + + final val KYUUBI_SESSION_USER = "kyuubi.session.user" } diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala index 6fb5399c0..5b7d3dfe1 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala @@ -35,6 +35,7 @@ object LineageConf { "`org.apache.kyuubi.plugin.lineage.LineageDispatcher` for dispatching lineage events.") .version("1.8.0") .stringConf diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/resources/atlas-application.properties b/extensions/spark/kyuubi-spark-lineage/src/test/resources/atlas-application.properties new file mode 100644 index 000000000..e6dc52f98 --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/test/resources/atlas-application.properties @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +atlas.cluster.name=test diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala new file mode 100644 index 000000000..cb98c52ef --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage.dispatcher.atlas + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.immutable.List + +import org.apache.atlas.model.instance.{AtlasEntity, AtlasObjectId} +import org.apache.commons.lang3.StringUtils +import org.apache.spark.SparkConf +import org.apache.spark.kyuubi.lineage.LineageConf.{DISPATCHERS, SKIP_PARSING_PERMANENT_VIEW_ENABLED} +import org.apache.spark.kyuubi.lineage.SparkContextHelper +import org.apache.spark.sql.SparkListenerExtensionTest +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.time.SpanSugar._ + +import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.plugin.lineage.Lineage +import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.{COLUMN_LINEAGE_TYPE, PROCESS_TYPE} +import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost + +class AtlasLineageDispatcherSuite extends KyuubiFunSuite with SparkListenerExtensionTest { + val catalogName = + if (isSparkVersionAtMost("3.1")) "org.apache.spark.sql.connector.InMemoryTableCatalog" + else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog" + + override protected val catalogImpl: String = "hive" + + override def sparkConf(): SparkConf = { + super.sparkConf() + .set("spark.sql.catalog.v2_catalog", catalogName) + .set( + "spark.sql.queryExecutionListeners", + "org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener") + .set(DISPATCHERS.key, "ATLAS") + .set(SKIP_PARSING_PERMANENT_VIEW_ENABLED.key, "true") + } + + override def afterAll(): Unit = { + spark.stop() + super.afterAll() + } + + test("altas lineage capture: insert into select sql") { + val mockAtlasClient = new MockAtlasClient() + AtlasClient.setClient(mockAtlasClient) + + withTable("test_table0") { _ => + spark.sql("create table test_table0(a string, b int, c int)") + spark.sql("create table test_table1(a string, d int)") + spark.sql("insert into test_table1 select a, b + c as d from test_table0").collect() + val expected = Lineage( + List("default.test_table0"), + List("default.test_table1"), + List( + ("default.test_table1.a", Set("default.test_table0.a")), + ("default.test_table1.d", Set("default.test_table0.b", "default.test_table0.c")))) + eventually(Timeout(5.seconds)) { + assert(mockAtlasClient.getEntities != null && mockAtlasClient.getEntities.nonEmpty) + } + checkAtlasProcessEntity(mockAtlasClient.getEntities.head, expected) + checkAtlasColumnLineageEntities( + mockAtlasClient.getEntities.head, + mockAtlasClient.getEntities.tail, + expected) + } + + } + + def checkAtlasProcessEntity(entity: AtlasEntity, expected: Lineage): Unit = { + assert(entity.getTypeName == PROCESS_TYPE) + + val appId = SparkContextHelper.globalSparkContext.applicationId + assert(entity.getAttribute("qualifiedName") == appId) + assert(entity.getAttribute("name") + == s"${SparkContextHelper.globalSparkContext.appName} $appId") + assert(StringUtils.isNotBlank(entity.getAttribute("currUser").asInstanceOf[String])) + assert(entity.getAttribute("executionId") != null) + assert(StringUtils.isNotBlank(entity.getAttribute("details").asInstanceOf[String])) + assert(StringUtils.isNotBlank(entity.getAttribute("sparkPlanDescription").asInstanceOf[String])) + + val inputs = entity.getRelationshipAttribute("inputs") + .asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName) + val outputs = entity.getRelationshipAttribute("outputs") + .asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName) + assertResult(expected.inputTables.map(s => s"$s@$cluster"))(inputs) + assertResult(expected.outputTables.map(s => s"$s@$cluster"))(outputs) + } + + def checkAtlasColumnLineageEntities( + processEntity: AtlasEntity, + entities: Seq[AtlasEntity], + expected: Lineage): Unit = { + assert(entities.size == expected.columnLineage.size) + + entities.zip(expected.columnLineage).foreach { + case (entity, expectedLineage) => + assert(entity.getTypeName == COLUMN_LINEAGE_TYPE) + val expectedQualifiedName = + s"${processEntity.getAttribute("qualifiedName")}:${expectedLineage.column}" + assert(entity.getAttribute("qualifiedName") == expectedQualifiedName) + assert(entity.getAttribute("name") == expectedQualifiedName) + + val inputs = entity.getRelationshipAttribute("inputs") + .asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName) + assertResult(expectedLineage.originalColumns.map(s => s"$s@$cluster"))(inputs.toSet) + + val outputs = entity.getRelationshipAttribute("outputs") + .asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName) + assert(outputs.size == 1) + assert(s"${expectedLineage.column}@$cluster" == outputs.head) + + assert(getQualifiedName(entity.getRelationshipAttribute("process").asInstanceOf[ + AtlasObjectId]) == processEntity.getAttribute("qualifiedName")) + } + } + + // Pre-set cluster name for testing in `test/resources/atlas-application.properties` + private val cluster = "test" + + def getQualifiedName(objId: AtlasObjectId): String = { + objId.getUniqueAttributes.get("qualifiedName").asInstanceOf[String] + } + + class MockAtlasClient() extends AtlasClient { + private var _entities: Seq[AtlasEntity] = _ + + override def send(entities: Seq[AtlasEntity]): Unit = { + _entities = entities + } + + def getEntities: Seq[AtlasEntity] = _entities + + override def close(): Unit = {} + } +} diff --git a/pom.xml b/pom.xml index 3b08ef622..515c15af9 100644 --- a/pom.xml +++ b/pom.xml @@ -123,6 +123,7 @@ 4.9.3 4.3.4 https://archive.apache.org/dist + 2.3.0 1.67 4.2.8 1.5.0