[KYUUBI #3537] Remove kyuubi dependency of the spark lineage plugin

### _Why are the changes needed?_

Remove kyuubi dependency of the spark lineage plugin.

### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [X] Add screenshots for manual tests if appropriate
![image](https://user-images.githubusercontent.com/17894939/191468933-a7b76b2c-a888-4a45-b294-794194c8ef03.png)

- [X] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3537 from wForget/dev-lineage.

Closes #3537

1cd2f549 [Wang Zhen] comment
71b11506 [Wang Zhen] fix and add test
d7465799 [Wang Zhen] fix test
9e44840f [Wang Zhen] Remove kyuubi dependency of the spark lineage plugin

Authored-by: Wang Zhen <wangzhen07@qiyi.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Wang Zhen 2022-11-03 15:46:48 +00:00 committed by Cheng Pan
parent 31d2ec1184
commit 7f83c454b6
8 changed files with 139 additions and 38 deletions

View File

@ -24,7 +24,6 @@
<groupId>org.apache.kyuubi</groupId>
<version>1.7.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -34,18 +33,6 @@
<url>https://kyuubi.apache.org/</url>
<dependencies>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-events_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
@ -58,6 +45,13 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>

View File

@ -17,10 +17,10 @@
package org.apache.kyuubi.plugin.lineage
import org.apache.spark.kyuubi.lineage.SparkContextHelper
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.plugin.lineage.events.OperationLineageEvent
import org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper
@ -29,12 +29,12 @@ class SparkOperationLineageQueryExecutionListener extends QueryExecutionListener
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
val lineage =
SparkSQLLineageParseHelper(qe.sparkSession).transformToLineage(qe.id, qe.optimizedPlan)
val eventTime = System.currentTimeMillis()
EventBus.post(OperationLineageEvent(qe.id, eventTime, lineage, None))
val event = OperationLineageEvent(qe.id, System.currentTimeMillis(), lineage, None)
SparkContextHelper.postEventToSparkListenerBus(event)
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
val eventTime = System.currentTimeMillis()
EventBus.post(OperationLineageEvent(qe.id, eventTime, None, Some(exception)))
val event = OperationLineageEvent(qe.id, System.currentTimeMillis(), None, Some(exception))
SparkContextHelper.postEventToSparkListenerBus(event)
}
}

View File

@ -17,8 +17,7 @@
package org.apache.kyuubi.plugin.lineage.events
import org.apache.kyuubi.Utils
import org.apache.kyuubi.events.KyuubiEvent
import org.apache.spark.scheduler.SparkListenerEvent
case class ColumnLineage(column: String, originalColumns: Set[String])
@ -66,8 +65,4 @@ case class OperationLineageEvent(
executionId: Long,
eventTime: Long,
lineage: Option[Lineage],
exception: Option[Throwable]) extends KyuubiEvent {
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(eventTime)) :: Nil
}
exception: Option[Throwable]) extends SparkListenerEvent

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.plugin.lineage.helper
/**
* Encapsulate a component (Kyuubi/Spark/Hive/Flink etc.) version
* for the convenience of version checks.
*/
case class SemanticVersion(majorVersion: Int, minorVersion: Int) {
def isVersionAtMost(targetVersionString: String): Boolean = {
this.compareVersion(
targetVersionString,
(targetMajor: Int, targetMinor: Int, runtimeMajor: Int, runtimeMinor: Int) =>
(runtimeMajor < targetMajor) || {
runtimeMajor == targetMajor && runtimeMinor <= targetMinor
})
}
def isVersionAtLeast(targetVersionString: String): Boolean = {
this.compareVersion(
targetVersionString,
(targetMajor: Int, targetMinor: Int, runtimeMajor: Int, runtimeMinor: Int) =>
(runtimeMajor > targetMajor) || {
runtimeMajor == targetMajor && runtimeMinor >= targetMinor
})
}
def isVersionEqualTo(targetVersionString: String): Boolean = {
this.compareVersion(
targetVersionString,
(targetMajor: Int, targetMinor: Int, runtimeMajor: Int, runtimeMinor: Int) =>
runtimeMajor == targetMajor && runtimeMinor == targetMinor)
}
def compareVersion(
targetVersionString: String,
callback: (Int, Int, Int, Int) => Boolean): Boolean = {
val targetVersion = SemanticVersion(targetVersionString)
val targetMajor = targetVersion.majorVersion
val targetMinor = targetVersion.minorVersion
callback(targetMajor, targetMinor, this.majorVersion, this.minorVersion)
}
override def toString: String = s"$majorVersion.$minorVersion"
}
object SemanticVersion {
def apply(versionString: String): SemanticVersion = {
"""^(\d+)\.(\d+)(\..*)?$""".r.findFirstMatchIn(versionString) match {
case Some(m) =>
SemanticVersion(m.group(1).toInt, m.group(2).toInt)
case None =>
throw new IllegalArgumentException(s"Tried to parse '$versionString' as a project" +
s" version string, but it could not find the major and minor version numbers.")
}
}
}

View File

@ -19,8 +19,6 @@ package org.apache.kyuubi.plugin.lineage.helper
import org.apache.spark.SPARK_VERSION
import org.apache.kyuubi.engine.SemanticVersion
object SparkListenerHelper {
lazy val sparkMajorMinorVersion: (Int, Int) = {

View File

@ -20,6 +20,7 @@ package org.apache.kyuubi.plugin.lineage.helper
import scala.collection.immutable.ListMap
import scala.util.{Failure, Success, Try}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, HiveTableRelation}
@ -32,7 +33,6 @@ import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, TableC
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.kyuubi.Logging
import org.apache.kyuubi.plugin.lineage.events.Lineage
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
@ -359,7 +359,7 @@ case class SparkSQLLineageParseHelper(sparkSession: SparkSession) extends Lineag
plan: LogicalPlan): Option[Lineage] = {
Try(parse(plan)).recover {
case e: Exception =>
warn(s"Extract Statement[$executionId] columns lineage failed.", e)
logWarning(s"Extract Statement[$executionId] columns lineage failed.", e)
throw e
}.toOption
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.kyuubi.lineage
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.SparkListenerEvent
import org.apache.spark.sql.SparkSession
object SparkContextHelper {
def globalSparkContext: SparkContext = SparkSession.active.sparkContext
def postEventToSparkListenerBus(
event: SparkListenerEvent,
sc: SparkContext = globalSparkContext) {
sc.listenerBus.post(event)
}
}

View File

@ -17,13 +17,13 @@
package org.apache.kyuubi.plugin.lineage.events
import java.util.concurrent.CountDownLatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.SparkListenerExtensionTest
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtensionTest {
@ -45,13 +45,19 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
test("operation lineage event capture: for execute sql") {
val countDownLatch = new CountDownLatch(1)
var actual: Lineage = null
EventBus.register[OperationLineageEvent] { event =>
event.lineage.foreach {
case lineage if lineage.inputTables.nonEmpty =>
actual = lineage
countDownLatch.countDown()
spark.sparkContext.addSparkListener(new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case lineageEvent: OperationLineageEvent =>
lineageEvent.lineage.foreach {
case lineage if lineage.inputTables.nonEmpty =>
actual = lineage
countDownLatch.countDown()
}
case _ =>
}
}
}
})
withTable("test_table0") { _ =>
spark.sql("create table test_table0(a string, b string)")
@ -62,7 +68,7 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
List(
("col0", Set("default.test_table0.a")),
("col1", Set("default.test_table0.b"))))
countDownLatch.await()
countDownLatch.await(20, TimeUnit.SECONDS)
assert(actual == expected)
}
}