diff --git a/extensions/spark/kyuubi-spark-lineage/pom.xml b/extensions/spark/kyuubi-spark-lineage/pom.xml
index ede7bcd9d..d182938d9 100644
--- a/extensions/spark/kyuubi-spark-lineage/pom.xml
+++ b/extensions/spark/kyuubi-spark-lineage/pom.xml
@@ -24,7 +24,6 @@
org.apache.kyuubi
1.7.0-SNAPSHOT
../../../pom.xml
-
4.0.0
@@ -34,18 +33,6 @@
https://kyuubi.apache.org/
-
- org.apache.kyuubi
- kyuubi-common_${scala.binary.version}
- ${project.version}
-
-
-
- org.apache.kyuubi
- kyuubi-events_${scala.binary.version}
- ${project.version}
-
-
org.apache.spark
spark-sql_${scala.binary.version}
@@ -58,6 +45,13 @@
test
+
+ org.apache.kyuubi
+ kyuubi-common_${scala.binary.version}
+ ${project.version}
+ test
+
+
org.apache.kyuubi
kyuubi-common_${scala.binary.version}
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala
index fd987fef0..c27d2eb8b 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/SparkOperationLineageQueryExecutionListener.scala
@@ -17,10 +17,10 @@
package org.apache.kyuubi.plugin.lineage
+import org.apache.spark.kyuubi.lineage.SparkContextHelper
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
-import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.plugin.lineage.events.OperationLineageEvent
import org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper
@@ -29,12 +29,12 @@ class SparkOperationLineageQueryExecutionListener extends QueryExecutionListener
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
val lineage =
SparkSQLLineageParseHelper(qe.sparkSession).transformToLineage(qe.id, qe.optimizedPlan)
- val eventTime = System.currentTimeMillis()
- EventBus.post(OperationLineageEvent(qe.id, eventTime, lineage, None))
+ val event = OperationLineageEvent(qe.id, System.currentTimeMillis(), lineage, None)
+ SparkContextHelper.postEventToSparkListenerBus(event)
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
- val eventTime = System.currentTimeMillis()
- EventBus.post(OperationLineageEvent(qe.id, eventTime, None, Some(exception)))
+ val event = OperationLineageEvent(qe.id, System.currentTimeMillis(), None, Some(exception))
+ SparkContextHelper.postEventToSparkListenerBus(event)
}
}
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEvent.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEvent.scala
index 9c7c7aad4..aa46ed9fc 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEvent.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEvent.scala
@@ -17,8 +17,7 @@
package org.apache.kyuubi.plugin.lineage.events
-import org.apache.kyuubi.Utils
-import org.apache.kyuubi.events.KyuubiEvent
+import org.apache.spark.scheduler.SparkListenerEvent
case class ColumnLineage(column: String, originalColumns: Set[String])
@@ -66,8 +65,4 @@ case class OperationLineageEvent(
executionId: Long,
eventTime: Long,
lineage: Option[Lineage],
- exception: Option[Throwable]) extends KyuubiEvent {
-
- override def partitions: Seq[(String, String)] =
- ("day", Utils.getDateFromTimestamp(eventTime)) :: Nil
-}
+ exception: Option[Throwable]) extends SparkListenerEvent
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SemanticVersion.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SemanticVersion.scala
new file mode 100644
index 000000000..a4a8b2e0e
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SemanticVersion.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.helper
+
+/**
+ * Encapsulate a component (Kyuubi/Spark/Hive/Flink etc.) version
+ * for the convenience of version checks.
+ */
+case class SemanticVersion(majorVersion: Int, minorVersion: Int) {
+
+ def isVersionAtMost(targetVersionString: String): Boolean = {
+ this.compareVersion(
+ targetVersionString,
+ (targetMajor: Int, targetMinor: Int, runtimeMajor: Int, runtimeMinor: Int) =>
+ (runtimeMajor < targetMajor) || {
+ runtimeMajor == targetMajor && runtimeMinor <= targetMinor
+ })
+ }
+
+ def isVersionAtLeast(targetVersionString: String): Boolean = {
+ this.compareVersion(
+ targetVersionString,
+ (targetMajor: Int, targetMinor: Int, runtimeMajor: Int, runtimeMinor: Int) =>
+ (runtimeMajor > targetMajor) || {
+ runtimeMajor == targetMajor && runtimeMinor >= targetMinor
+ })
+ }
+
+ def isVersionEqualTo(targetVersionString: String): Boolean = {
+ this.compareVersion(
+ targetVersionString,
+ (targetMajor: Int, targetMinor: Int, runtimeMajor: Int, runtimeMinor: Int) =>
+ runtimeMajor == targetMajor && runtimeMinor == targetMinor)
+ }
+
+ def compareVersion(
+ targetVersionString: String,
+ callback: (Int, Int, Int, Int) => Boolean): Boolean = {
+ val targetVersion = SemanticVersion(targetVersionString)
+ val targetMajor = targetVersion.majorVersion
+ val targetMinor = targetVersion.minorVersion
+ callback(targetMajor, targetMinor, this.majorVersion, this.minorVersion)
+ }
+
+ override def toString: String = s"$majorVersion.$minorVersion"
+}
+
+object SemanticVersion {
+
+ def apply(versionString: String): SemanticVersion = {
+ """^(\d+)\.(\d+)(\..*)?$""".r.findFirstMatchIn(versionString) match {
+ case Some(m) =>
+ SemanticVersion(m.group(1).toInt, m.group(2).toInt)
+ case None =>
+ throw new IllegalArgumentException(s"Tried to parse '$versionString' as a project" +
+ s" version string, but it could not find the major and minor version numbers.")
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala
index cd44ad8c9..f2808a4e9 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala
@@ -19,8 +19,6 @@ package org.apache.kyuubi.plugin.lineage.helper
import org.apache.spark.SPARK_VERSION
-import org.apache.kyuubi.engine.SemanticVersion
-
object SparkListenerHelper {
lazy val sparkMajorMinorVersion: (Int, Int) = {
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index 67c9b5679..f2d8529d2 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.plugin.lineage.helper
import scala.collection.immutable.ListMap
import scala.util.{Failure, Success, Try}
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, HiveTableRelation}
@@ -32,7 +33,6 @@ import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, TableC
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
-import org.apache.kyuubi.Logging
import org.apache.kyuubi.plugin.lineage.events.Lineage
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
@@ -359,7 +359,7 @@ case class SparkSQLLineageParseHelper(sparkSession: SparkSession) extends Lineag
plan: LogicalPlan): Option[Lineage] = {
Try(parse(plan)).recover {
case e: Exception =>
- warn(s"Extract Statement[$executionId] columns lineage failed.", e)
+ logWarning(s"Extract Statement[$executionId] columns lineage failed.", e)
throw e
}.toOption
}
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/SparkContextHelper.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/SparkContextHelper.scala
new file mode 100644
index 000000000..e6272364f
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/SparkContextHelper.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi.lineage
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.SparkListenerEvent
+import org.apache.spark.sql.SparkSession
+
+object SparkContextHelper {
+
+ def globalSparkContext: SparkContext = SparkSession.active.sparkContext
+
+ def postEventToSparkListenerBus(
+ event: SparkListenerEvent,
+ sc: SparkContext = globalSparkContext) {
+ sc.listenerBus.post(event)
+ }
+
+}
diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
index 3ccddf316..bb7fc9198 100644
--- a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
+++ b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
@@ -17,13 +17,13 @@
package org.apache.kyuubi.plugin.lineage.events
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.SparkListenerExtensionTest
import org.apache.kyuubi.KyuubiFunSuite
-import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtensionTest {
@@ -45,13 +45,19 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
test("operation lineage event capture: for execute sql") {
val countDownLatch = new CountDownLatch(1)
var actual: Lineage = null
- EventBus.register[OperationLineageEvent] { event =>
- event.lineage.foreach {
- case lineage if lineage.inputTables.nonEmpty =>
- actual = lineage
- countDownLatch.countDown()
+ spark.sparkContext.addSparkListener(new SparkListener {
+ override def onOtherEvent(event: SparkListenerEvent): Unit = {
+ event match {
+ case lineageEvent: OperationLineageEvent =>
+ lineageEvent.lineage.foreach {
+ case lineage if lineage.inputTables.nonEmpty =>
+ actual = lineage
+ countDownLatch.countDown()
+ }
+ case _ =>
+ }
}
- }
+ })
withTable("test_table0") { _ =>
spark.sql("create table test_table0(a string, b string)")
@@ -62,7 +68,7 @@ class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtens
List(
("col0", Set("default.test_table0.a")),
("col1", Set("default.test_table0.b"))))
- countDownLatch.await()
+ countDownLatch.await(20, TimeUnit.SECONDS)
assert(actual == expected)
}
}