[KYUUBI #5196] Enable CI Test on Scala 2.13 and support custom or spark-core extracted Scala version for Spark's engine

### _Why are the changes needed?_

- enable CI test on Scala-2.13 for all modules except Flink SQL engine
- For testing, choose available Spark engine home in `download` module by `SCALA_COMPILE_VERSION` of Kyuubi server
- Choose the Scala version of Spark engine main resource Jar  in the following order:
  1. `SPARK_SCALA_VERSION` system env
  2. Extract Scala version from Spark home's `spark-core` jar filename
- Fixed 1 assertion error of kyuubi-spark-lineage module, as Spark on Scala 2.12 and 2.13 show different order of column linage output in `MergeIntoTable` ut
```
SparkSQLLineageParserHelperSuite:
- columns lineage extract - MergeIntoTable *** FAILED ***
  inputTables(List(v2_catalog.db.source_t))
  outputTables(List(v2_catalog.db.target_t))
  columnLineage(List(ColumnLineage(v2_catalog.db.target_t.name,Set(v2_catalog.db.source_t.name)), ColumnLineage(v2_catalog.db.target_t.price,Set(v2_catalog.db.source_t.price)), ColumnLineage(v2_catalog.db.target_t.id,Set(v2_catalog.db.source_t.id)))) did not equal inputTables(List(v2_catalog.db.source_t))
  outputTables(List(v2_catalog.db.target_t))
  columnLineage(List(ColumnLineage(v2_catalog.db.target_t.id,Set(v2_catalog.db.source_t.id)), ColumnLineage(v2_catalog.db.target_t.name,Set(v2_catalog.db.source_t.name)), ColumnLineage(v2_catalog.db.target_t.price,Set(v2_catalog.db.source_t.price)))) (SparkSQLLineageParserHelperSuite.scala:182)
```
- Fixed other tests relying on Scala scripting results

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

Closes #5196 from bowenliang123/scala213-test.

Closes #5196

97fafacd3 [liangbowen] prevent repeated compilation for regrex pattern
76b99d423 [Bowen Liang] test on scala-2.13

Lead-authored-by: Bowen Liang <liangbowen@gf.com.cn>
Co-authored-by: liangbowen <liangbowen@gf.com.cn>
Signed-off-by: Bowen Liang <liangbowen@gf.com.cn>
This commit is contained in:
Bowen Liang 2023-10-10 08:42:40 +08:00
parent 7eac400488
commit e33df9ce66
14 changed files with 186 additions and 45 deletions

View File

@ -170,16 +170,18 @@ jobs:
**/target/unit-tests.log
**/kyuubi-spark-sql-engine.log*
scala213:
name: Scala Compilation Test
scala-test:
name: Scala Test
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
java:
- '8'
scala:
- '2.13'
java:
- '8'
spark:
- '3.4'
steps:
- uses: actions/checkout@v3
- name: Tune Runner VM
@ -193,14 +195,24 @@ jobs:
check-latest: false
- name: Setup Maven
uses: ./.github/actions/setup-maven
- name: Cache Engine Archives
uses: ./.github/actions/cache-engine-archives
- name: Build on Scala ${{ matrix.scala }}
run: |
MODULES='!externals/kyuubi-flink-sql-engine'
./build/mvn clean install -pl ${MODULES} -am \
-DskipTests -Pflink-provided,hive-provided,spark-provided \
-Pjava-${{ matrix.java }} \
-Pscala-${{ matrix.scala }} \
-Pspark-3.3
TEST_MODULES="!externals/kyuubi-flink-sql-engine,!integration-tests/kyuubi-flink-it"
./build/mvn clean install ${MVN_OPT} -pl ${TEST_MODULES} -am \
-Pscala-${{ matrix.scala }} -Pjava-${{ matrix.java }} -Pspark-${{ matrix.spark }}
- name: Upload test logs
if: failure()
uses: actions/upload-artifact@v3
with:
name: unit-tests-log-scala-${{ matrix.scala }}-java-${{ matrix.java }}-spark-${{ matrix.spark }}
path: |
**/target/unit-tests.log
**/kyuubi-spark-sql-engine.log*
**/kyuubi-spark-batch-submit.log*
**/kyuubi-jdbc-engine.log*
**/kyuubi-hive-sql-engine.log*
flink-it:
name: Flink Test

View File

@ -32,8 +32,9 @@ class Lineage(
override def equals(other: Any): Boolean = other match {
case otherLineage: Lineage =>
otherLineage.inputTables == inputTables && otherLineage.outputTables == outputTables &&
otherLineage.columnLineage == columnLineage
otherLineage.inputTables.toSet == inputTables.toSet &&
otherLineage.outputTables.toSet == outputTables.toSet &&
otherLineage.columnLineage.toSet == columnLineage.toSet
case _ => false
}

View File

@ -112,4 +112,8 @@
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>

View File

@ -69,4 +69,9 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>

View File

@ -114,5 +114,7 @@
</executions>
</plugin>
</plugins>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>

View File

@ -88,4 +88,9 @@
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>

View File

@ -17,8 +17,11 @@
package org.apache.kyuubi.operation
import scala.collection.mutable.ListBuffer
import org.apache.kyuubi.{IcebergSuiteMixin, SPARK_COMPILE_VERSION}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.util.AssertionUtils._
import org.apache.kyuubi.util.SparkVersionUtil
trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin with SparkVersionUtil {
@ -27,10 +30,11 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
val catalogs = metaData.getCatalogs
catalogs.next()
assert(catalogs.getString(TABLE_CAT) === "spark_catalog")
catalogs.next()
assert(catalogs.getString(TABLE_CAT) === catalog)
val results = ListBuffer[String]()
while (catalogs.next()) {
results += catalogs.getString(TABLE_CAT)
}
assertContains(results, "spark_catalog", catalog)
}
}

View File

@ -270,7 +270,7 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
|""".stripMargin
val rs1 = statement.executeQuery(code)
rs1.next()
assert(rs1.getString(1) startsWith "df: org.apache.spark.sql.DataFrame")
assert(rs1.getString(1) contains "df: org.apache.spark.sql.DataFrame")
// continue
val rs2 = statement.executeQuery("df.count()")
@ -311,7 +311,7 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
|""".stripMargin
val rs5 = statement.executeQuery(code2)
rs5.next()
assert(rs5.getString(1) startsWith "df: org.apache.spark.sql.DataFrame")
assert(rs5.getString(1) contains "df: org.apache.spark.sql.DataFrame")
// re-assign
val rs6 = statement.executeQuery("result.set(df)")
@ -420,7 +420,7 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
statement.execute(code1)
val rs = statement.executeQuery(code2)
rs.next()
assert(rs.getString(1) == "x: Int = 3")
assert(rs.getString(1) contains "x: Int = 3")
}
}

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.engine
import java.io.{File, FilenameFilter, IOException}
import java.io.{File, FileFilter, IOException}
import java.net.URI
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
@ -56,13 +56,14 @@ trait ProcBuilder {
}
}
protected val engineScalaBinaryVersion: String = SCALA_COMPILE_VERSION
/**
* The engine jar or other runnable jar containing the main method
*/
def mainResource: Option[String] = {
// 1. get the main resource jar for user specified config first
// TODO use SPARK_SCALA_VERSION instead of SCALA_COMPILE_VERSION
val jarName = s"${module}_$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"
val jarName: String = s"${module}_$engineScalaBinaryVersion-$KYUUBI_VERSION.jar"
conf.getOption(s"kyuubi.session.engine.$shortName.main.resource").filter { userSpecified =>
// skip check exist if not local file.
val uri = new URI(userSpecified)
@ -295,6 +296,11 @@ trait ProcBuilder {
}
}
protected lazy val engineHomeDirFilter: FileFilter = file => {
val fileName = file.getName
file.isDirectory && fileName.contains(s"$shortName-") && !fileName.contains("-engine")
}
/**
* Get the home directly that contains binary distributions of engines.
*
@ -311,9 +317,6 @@ trait ProcBuilder {
* @return SPARK_HOME, HIVE_HOME, etc.
*/
protected def getEngineHome(shortName: String): String = {
val homeDirFilter: FilenameFilter = (dir: File, name: String) =>
dir.isDirectory && name.contains(s"$shortName-") && !name.contains("-engine")
val homeKey = s"${shortName.toUpperCase}_HOME"
// 1. get from env, e.g. SPARK_HOME, FLINK_HOME
env.get(homeKey)
@ -321,14 +324,14 @@ trait ProcBuilder {
// 2. get from $KYUUBI_HOME/externals/kyuubi-download/target
env.get(KYUUBI_HOME).flatMap { p =>
val candidates = Paths.get(p, "externals", "kyuubi-download", "target")
.toFile.listFiles(homeDirFilter)
.toFile.listFiles(engineHomeDirFilter)
if (candidates == null) None else candidates.map(_.toPath).headOption
}.filter(Files.exists(_)).map(_.toAbsolutePath.toFile.getCanonicalPath)
}.orElse {
// 3. get from kyuubi-server/../externals/kyuubi-download/target
Utils.getCodeSourceLocation(getClass).split("kyuubi-server").flatMap { cwd =>
val candidates = Paths.get(cwd, "externals", "kyuubi-download", "target")
.toFile.listFiles(homeDirFilter)
.toFile.listFiles(engineHomeDirFilter)
if (candidates == null) None else candidates.map(_.toPath).headOption
}.find(Files.exists(_)).map(_.toAbsolutePath.toFile.getCanonicalPath)
} match {

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.spark
import java.io.{File, IOException}
import java.io.{File, FileFilter, IOException}
import java.nio.file.Paths
import java.util.Locale
@ -25,6 +25,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import com.google.common.annotations.VisibleForTesting
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi._
@ -35,8 +36,7 @@ import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.KubernetesUtils
import org.apache.kyuubi.util.Validator
import org.apache.kyuubi.util.{KubernetesUtils, Validator}
class SparkProcessBuilder(
override val proxyUser: String,
@ -102,6 +102,25 @@ class SparkProcessBuilder(
}
}
private[kyuubi] def extractSparkCoreScalaVersion(fileNames: Iterable[String]): String = {
fileNames.collectFirst { case SPARK_CORE_SCALA_VERSION_REGEX(scalaVersion) => scalaVersion }
.getOrElse(throw new KyuubiException("Failed to extract Scala version from spark-core jar"))
}
override protected val engineScalaBinaryVersion: String = {
val sparkCoreScalaVersion =
extractSparkCoreScalaVersion(Paths.get(sparkHome, "jars").toFile.list())
StringUtils.defaultIfBlank(System.getenv("SPARK_SCALA_VERSION"), sparkCoreScalaVersion)
}
override protected lazy val engineHomeDirFilter: FileFilter = file => {
val r = SCALA_COMPILE_VERSION match {
case "2.12" => SPARK_HOME_REGEX_SCALA_212
case "2.13" => SPARK_HOME_REGEX_SCALA_213
}
file.isDirectory && r.findFirstMatchIn(file.getName).isDefined
}
override protected lazy val commands: Array[String] = {
// complete `spark.master` if absent on kubernetes
completeMasterUrl(conf)
@ -314,4 +333,13 @@ object SparkProcessBuilder {
final private val SPARK_SUBMIT_FILE = if (Utils.isWindows) "spark-submit.cmd" else "spark-submit"
final private val SPARK_CONF_DIR = "SPARK_CONF_DIR"
final private val SPARK_CONF_FILE_NAME = "spark-defaults.conf"
final private[kyuubi] val SPARK_CORE_SCALA_VERSION_REGEX =
"""^spark-core_(\d\.\d+).*.jar$""".r
final private[kyuubi] val SPARK_HOME_REGEX_SCALA_212 =
"""^spark-\d+\.\d+\.\d+-bin-hadoop\d+(\.\d+)?$""".r
final private[kyuubi] val SPARK_HOME_REGEX_SCALA_213 =
"""^spark-\d+\.\d+\.\d+-bin-hadoop\d(\.\d+)?+-scala\d+(\.\d+)?$""".r
}

View File

@ -26,7 +26,7 @@ import java.util.concurrent.{Executors, TimeUnit}
import org.scalatest.time.SpanSugar._
import org.scalatestplus.mockito.MockitoSugar
import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils}
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_LOG_TIMEOUT, ENGINE_SPARK_MAIN_RESOURCE}
import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
@ -34,6 +34,7 @@ import org.apache.kyuubi.engine.spark.SparkProcessBuilder._
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.service.ServiceUtils
import org.apache.kyuubi.util.AssertionUtils._
class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
private def conf = KyuubiConf().set("kyuubi.on", "off")
@ -363,6 +364,46 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
.appendPodNameConf(conf3).get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
assert(execPodNamePrefix3 === Some(s"kyuubi-$engineRefId"))
}
test("extract spark core scala version") {
val builder = new SparkProcessBuilder("kentyao", KyuubiConf(false))
Seq(
"spark-core_2.13-3.4.1.jar",
"spark-core_2.13-3.5.0-abc-20230921.jar",
"spark-core_2.13-3.5.0-xyz-1.2.3.jar",
"spark-core_2.13-3.5.0.1.jar",
"spark-core_2.13.2-3.5.0.jar").foreach { f =>
assertResult("2.13")(builder.extractSparkCoreScalaVersion(Seq(f)))
}
Seq(
"spark-dummy_2.13-3.5.0.jar",
"spark-core_2.13-3.5.0.1.zip",
"yummy-spark-core_2.13-3.5.0.jar").foreach { f =>
assertThrows[KyuubiException](builder.extractSparkCoreScalaVersion(Seq(f)))
}
}
test("match scala version of spark home") {
SCALA_COMPILE_VERSION match {
case "2.12" => Seq(
"spark-3.2.4-bin-hadoop3.2",
"spark-3.2.4-bin-hadoop2.7",
"spark-3.4.1-bin-hadoop3")
.foreach { sparkHome =>
assertMatches(sparkHome, SPARK_HOME_REGEX_SCALA_212)
assertNotMatches(sparkHome, SPARK_HOME_REGEX_SCALA_213)
}
case "2.13" => Seq(
"spark-3.2.4-bin-hadoop3.2-scala2.13",
"spark-3.4.1-bin-hadoop3-scala2.13",
"spark-3.5.0-bin-hadoop3-scala2.13")
.foreach { sparkHome =>
assertMatches(sparkHome, SPARK_HOME_REGEX_SCALA_213)
assertNotMatches(sparkHome, SPARK_HOME_REGEX_SCALA_212)
}
}
}
}
class FakeSparkProcessBuilder(config: KyuubiConf)

View File

@ -86,8 +86,9 @@ class SessionSigningSuite extends WithKyuubiServer with HiveJDBCTestHelper {
assert(rs2.next())
// skipping prefix "res0: String = " of returned scala result
val publicKeyStr = rs1.getString(1).substring(15)
val sessionUserSign = rs2.getString(1).substring(15)
val sep = " = "
val publicKeyStr = StringUtils.substringAfter(rs1.getString(1), sep)
val sessionUserSign = StringUtils.substringAfter(rs2.getString(1), sep)
assert(StringUtils.isNotBlank(publicKeyStr))
assert(StringUtils.isNotBlank(sessionUserSign))

View File

@ -23,14 +23,16 @@ import java.util.Locale
import scala.collection.Traversable
import scala.io.Source
import scala.reflect.ClassTag
import scala.util.matching.Regex
import org.scalactic.{source, Prettifier}
import org.scalactic.Prettifier
import org.scalactic.source.Position
import org.scalatest.Assertions._
object AssertionUtils {
def assertEqualsIgnoreCase(expected: AnyRef)(actual: AnyRef)(
implicit pos: source.Position): Unit = {
implicit pos: Position): Unit = {
val isEqualsIgnoreCase = (Option(expected), Option(actual)) match {
case (Some(expectedStr: String), Some(actualStr: String)) =>
expectedStr.equalsIgnoreCase(actualStr)
@ -44,15 +46,15 @@ object AssertionUtils {
}
}
def assertStartsWithIgnoreCase(expectedPrefix: String)(actual: String)(
implicit pos: source.Position): Unit = {
def assertStartsWithIgnoreCase(expectedPrefix: String)(actual: String)(implicit
pos: Position): Unit = {
if (!actual.toLowerCase(Locale.ROOT).startsWith(expectedPrefix.toLowerCase(Locale.ROOT))) {
fail(s"Expected starting with '$expectedPrefix' ignoring case, but got [$actual]")(pos)
}
}
def assertExistsIgnoreCase(expected: String)(actual: Iterable[String])(
implicit pos: source.Position): Unit = {
def assertExistsIgnoreCase(expected: String)(actual: Iterable[String])(implicit
pos: Position): Unit = {
if (!actual.exists(_.equalsIgnoreCase(expected))) {
fail(s"Expected containing '$expected' ignoring case, but got [$actual]")(pos)
}
@ -73,7 +75,7 @@ object AssertionUtils {
regenScript: String,
splitFirstExpectedLine: Boolean = false)(implicit
prettifier: Prettifier,
pos: source.Position): Unit = {
pos: Position): Unit = {
val fileSource = Source.fromFile(path.toUri, StandardCharsets.UTF_8.name())
try {
def expectedLinesIter = if (splitFirstExpectedLine) {
@ -104,13 +106,44 @@ object AssertionUtils {
}
}
/**
* Assert the iterable contains all the expected elements
*/
def assertContains(actual: TraversableOnce[AnyRef], expected: AnyRef*)(implicit
prettifier: Prettifier,
pos: Position): Unit =
withClue(s", expected containing [${expected.mkString(", ")}]") {
val actualSeq = actual.toSeq
expected.foreach { elem => assert(actualSeq.contains(elem))(prettifier, pos) }
}
/**
* Asserts the string matches the regex
*/
def assertMatches(actual: String, regex: Regex)(implicit
prettifier: Prettifier,
pos: Position): Unit =
withClue(s"'$actual' expected matching the regex '$regex'") {
assert(regex.findFirstMatchIn(actual).isDefined)(prettifier, pos)
}
/**
* Asserts the string does not match the regex
*/
def assertNotMatches(actual: String, regex: Regex)(implicit
prettifier: Prettifier,
pos: Position): Unit =
withClue(s"'$actual' expected not matching the regex '$regex'") {
assert(regex.findFirstMatchIn(actual).isEmpty)(prettifier, pos)
}
/**
* Asserts that the given function throws an exception of the given type
* and with the exception message equals to expected string
*/
def interceptEquals[T <: Exception](f: => Any)(expected: String)(implicit
classTag: ClassTag[T],
pos: source.Position): Unit = {
pos: Position): Unit = {
assert(expected != null)
val exception = intercept[T](f)(classTag, pos)
assertResult(expected)(exception.getMessage)
@ -122,7 +155,7 @@ object AssertionUtils {
*/
def interceptContains[T <: Exception](f: => Any)(contained: String)(implicit
classTag: ClassTag[T],
pos: source.Position): Unit = {
pos: Position): Unit = {
assert(contained != null)
val exception = intercept[T](f)(classTag, pos)
assert(exception.getMessage.contains(contained))

View File

@ -138,7 +138,7 @@
<fb303.version>0.9.3</fb303.version>
<flexmark.version>0.62.2</flexmark.version>
<flink.version>1.17.1</flink.version>
<flink.archive.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.archive.name>
<flink.archive.name>flink-${flink.version}-bin-scala_2.12.tgz</flink.archive.name>
<flink.archive.mirror>${apache.archive.dist}/flink/flink-${flink.version}</flink.archive.mirror>
<flink.archive.download.skip>false</flink.archive.download.skip>
<google.jsr305.version>3.0.2</google.jsr305.version>
@ -198,7 +198,8 @@
-->
<spark.version>3.4.1</spark.version>
<spark.binary.version>3.4</spark.binary.version>
<spark.archive.name>spark-${spark.version}-bin-hadoop3.tgz</spark.archive.name>
<spark.archive.scala.suffix></spark.archive.scala.suffix>
<spark.archive.name>spark-${spark.version}-bin-hadoop3${spark.archive.scala.suffix}.tgz</spark.archive.name>
<spark.archive.mirror>${apache.archive.dist}/spark/spark-${spark.version}</spark.archive.mirror>
<spark.archive.download.skip>false</spark.archive.download.skip>
<sqlite.version>3.42.0.0</sqlite.version>
@ -2135,6 +2136,7 @@
<properties>
<scala.binary.version>2.13</scala.binary.version>
<scala.version>2.13.8</scala.version>
<spark.archive.scala.suffix>-scala${scala.binary.version}</spark.archive.scala.suffix>
</properties>
<build>
<pluginManagement>
@ -2207,7 +2209,7 @@
<spark.version>3.2.4</spark.version>
<spark.binary.version>3.2</spark.binary.version>
<delta.version>2.0.2</delta.version>
<spark.archive.name>spark-${spark.version}-bin-hadoop3.2.tgz</spark.archive.name>
<spark.archive.name>spark-${spark.version}-bin-hadoop3.2${spark.archive.scala.suffix}.tgz</spark.archive.name>
<maven.plugin.scalatest.exclude.tags>org.scalatest.tags.Slow</maven.plugin.scalatest.exclude.tags>
</properties>
</profile>