[KYUUBI #5188] Make server module and Spark sql engine module compilable on Scala 2.13

### _Why are the changes needed?_

- adding a basic compilation CI test on Scala 2.13 , skipping test runs
- separate versions of `KyuubiSparkILoop` for Scala 2.12 and 2.13, adapting the changes of Scala interpreter packages
- rename `export` variable
```
[Error] /Users/bw/dev/kyuubi/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala:563: Wrap `export` in backticks to use it as an identifier, it will become a keyword in Scala 3.
```

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No.

Closes #5188 from bowenliang123/sparksql-213.

Closes #5188

04f192064 [liangbowen] update
9e764271b [liangbowen] add ci for compilation with server module
a465375bd [liangbowen] update
5c3f24fdf [liangbowen] update
4b6a6e339 [liangbowen] use Iterable for the row in MySQLTextResultSetRowPacket
f09d61d26 [liangbowen] use ListMap.newBuilder
6b5480872 [liangbowen] Use Iterable for collections
1abfa29d8 [liangbowen] 2.12's KyuubiSparkILoop
b1c9da591 [liangbowen] rename export variable
b6a6e077b [liangbowen] remove original KyuubiSparkILoop
15438b503 [liangbowen] move back scala 2.12's KyuubiSparkILoop
dd3244351 [liangbowen] adapt spark sql module to 2.13
62d3abbf0 [liangbowen] adapt server module to 2.13

Authored-by: liangbowen <liangbowen@gf.com.cn>
Signed-off-by: liangbowen <liangbowen@gf.com.cn>
This commit is contained in:
liangbowen 2023-08-23 16:52:19 +08:00
parent bdf867b19a
commit 22a47044e9
13 changed files with 209 additions and 15 deletions

View File

@ -168,6 +168,37 @@ jobs:
**/target/unit-tests.log
**/kyuubi-spark-sql-engine.log*
scala213:
name: Scala Compilation Test
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
java:
- '8'
scala:
- '2.13'
steps:
- uses: actions/checkout@v3
- name: Tune Runner VM
uses: ./.github/actions/tune-runner-vm
- name: Setup JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: ${{ matrix.java }}
cache: 'maven'
check-latest: false
- name: Setup Maven
uses: ./.github/actions/setup-maven
- name: Build on Scala
run: |
MODULES="kyuubi-server"
./build/mvn clean install -pl ${MODULES} -am \
-DskipTests -Pflink-provided,hive-provided,spark-provided \
-Pjava-${{ matrix.java }} \
-Pscala-${{ matrix.scala }}
flink-it:
name: Flink Test
runs-on: ubuntu-22.04

View File

@ -174,6 +174,37 @@
<build>
<plugins>
<!-- Include a source dir depending on the Scala version -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-scala-sources</id>
<goals>
<goal>add-source</goal>
</goals>
<phase>generate-sources</phase>
<configuration>
<sources>
<source>src/main/scala-${scala.binary.version}</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-scala-test-sources</id>
<goals>
<goal>add-test-source</goal>
</goals>
<phase>generate-test-sources</phase>
<configuration>
<sources>
<source>src/test/scala-${scala.binary.version}</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.repl
import org.apache.kyuubi.Utils
import org.apache.spark.SparkContext
import org.apache.spark.repl.SparkILoop
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.util.MutableURLClassLoader
import java.io.{ByteArrayOutputStream, File, PrintWriter}
import java.util.concurrent.locks.ReentrantLock
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.{IMain, Results}
private[spark] case class KyuubiSparkILoop private (
spark: SparkSession,
output: ByteArrayOutputStream)
extends SparkILoop(null, new PrintWriter(output)) {
import KyuubiSparkILoop._
val result = new DataFrameHolder(spark)
private def initialize(): Unit = withLockRequired {
val settings = new Settings
val interpArguments = List(
"-Yrepl-class-based",
"-Yrepl-outdir",
s"${spark.sparkContext.getConf.get("spark.repl.class.outputDir")}")
settings.processArguments(interpArguments, processAll = true)
settings.usejavacp.value = true
val currentClassLoader = Thread.currentThread().getContextClassLoader
settings.embeddedDefaults(currentClassLoader)
this.createInterpreter(settings)
val iMain = this.intp.asInstanceOf[IMain]
iMain.initializeCompiler()
try {
this.compilerClasspath
iMain.ensureClassLoader()
var classLoader: ClassLoader = Thread.currentThread().getContextClassLoader
while (classLoader != null) {
classLoader match {
case loader: MutableURLClassLoader =>
val allJars = loader.getURLs.filter { u =>
val file = new File(u.getPath)
u.getProtocol == "file" && file.isFile &&
file.getName.contains("scala-lang_scala-reflect")
}
this.addUrlsToClassPath(allJars: _*)
classLoader = null
case _ =>
classLoader = classLoader.getParent
}
}
this.addUrlsToClassPath(
classOf[DataFrameHolder].getProtectionDomain.getCodeSource.getLocation)
} finally {
Thread.currentThread().setContextClassLoader(currentClassLoader)
}
this.beQuietDuring {
// SparkSession/SparkContext and their implicits
this.bind("spark", classOf[SparkSession].getCanonicalName, spark, List("""@transient"""))
this.bind(
"sc",
classOf[SparkContext].getCanonicalName,
spark.sparkContext,
List("""@transient"""))
this.interpret("import org.apache.spark.SparkContext._")
this.interpret("import spark.implicits._")
this.interpret("import spark.sql")
this.interpret("import org.apache.spark.sql.functions._")
// for feeding results to client, e.g. beeline
this.bind(
"result",
classOf[DataFrameHolder].getCanonicalName,
result)
}
}
def getResult(statementId: String): DataFrame = result.get(statementId)
def clearResult(statementId: String): Unit = result.unset(statementId)
def interpretWithRedirectOutError(statement: String): Results.Result = withLockRequired {
Console.withOut(output) {
Console.withErr(output) {
this.interpret(statement)
}
}
}
def getOutput: String = {
val res = output.toString.trim
output.reset()
res
}
}
private[spark] object KyuubiSparkILoop {
def apply(spark: SparkSession): KyuubiSparkILoop = {
val os = new ByteArrayOutputStream()
val iLoop = new KyuubiSparkILoop(spark, os)
iLoop.initialize()
iLoop
}
private val lock = new ReentrantLock()
private def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block)
}

View File

@ -32,7 +32,7 @@ import org.apache.kyuubi.events.handler.KafkaLoggingEventHandler._
*/
class KafkaLoggingEventHandler(
topic: String,
producerConf: Map[String, String],
producerConf: Iterable[(String, String)],
kyuubiConf: KyuubiConf,
closeTimeoutInMs: Long) extends EventHandler[KyuubiEvent] with Logging {
private def defaultProducerConf: Properties = {

View File

@ -21,7 +21,7 @@ import org.apache.kyuubi.config.KyuubiConf
case class ServerKafkaLoggingEventHandler(
topic: String,
producerConf: Map[String, String],
producerConf: Iterable[(String, String)],
kyuubiConf: KyuubiConf,
closeTimeoutInMs: Long)
extends KafkaLoggingEventHandler(topic, producerConf, kyuubiConf, closeTimeoutInMs)

View File

@ -311,6 +311,7 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
node.instance,
node.namespace,
node.attributes.asJava))
.toSeq
}
@ApiResponse(
@ -339,7 +340,7 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
servers += ApiUtils.serverData(nodeInfo)
})
}
servers
servers.toSeq
}
private def getEngine(

View File

@ -258,6 +258,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
case None => throw new IllegalStateException(
s"can not find duplicated batch $batchId from metadata store")
}
case Failure(cause) => throw new IllegalStateException(cause)
}
}
@ -281,6 +282,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
case None => throw new IllegalStateException(
s"can not find duplicated batch $batchId from metadata store")
}
case Failure(cause) => throw new IllegalStateException(cause)
}
}
}

View File

@ -261,7 +261,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
queryBuilder.append(dialect.limitClause(size, from))
val query = queryBuilder.toString
JdbcUtils.withConnection { connection =>
withResultSet(connection, query, params: _*) { rs =>
withResultSet(connection, query, params.toSeq: _*) { rs =>
buildMetadata(rs, stateOnly)
}
}
@ -386,7 +386,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
val query = queryBuilder.toString()
JdbcUtils.withConnection { connection =>
withUpdateCount(connection, query, params: _*) { updateCount =>
withUpdateCount(connection, query, params.toSeq: _*) { updateCount =>
if (updateCount == 0) {
throw new KyuubiException(
s"Error updating metadata for ${metadata.identifier} by SQL: $query, " +
@ -470,7 +470,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
peerInstanceClosed = peerInstanceClosed)
metadataList += metadata
}
metadataList
metadataList.toSeq
} finally {
Utils.tryLogNonFatalError(resultSet.close())
}

View File

@ -84,7 +84,7 @@ case class MySQLColumnDefinition41Packet(
case class MySQLTextResultSetRowPacket(
sequenceId: Int,
row: Seq[Any]) extends MySQLPacket with SupportsEncode {
row: Iterable[Any]) extends MySQLPacket with SupportsEncode {
private def nullVal = 0xFB

View File

@ -42,7 +42,7 @@ trait MySQLQueryResult {
def toColDefinePackets: Seq[MySQLPacket]
def toRowPackets: Seq[MySQLPacket]
def toRowPackets: Iterable[MySQLPacket]
def toPackets: Seq[MySQLPacket] = {
val buf = Seq.newBuilder[MySQLPacket]
@ -77,7 +77,7 @@ class MySQLSimpleQueryResult(
decimals = decimals)
}
override def toRowPackets: Seq[MySQLPacket] =
override def toRowPackets: Iterable[MySQLPacket] =
rows.zipWithIndex.map { case (row, i) =>
val sequenceId = colCount + 3 + i
MySQLTextResultSetRowPacket(sequenceId = sequenceId, row = row)
@ -94,8 +94,9 @@ class MySQLThriftQueryResult(
override def toColDefinePackets: Seq[MySQLPacket] = schema.getColumns.asScala
.zipWithIndex.map { case (tCol, i) => tColDescToMySQL(tCol, 2 + i) }
.toSeq
override def toRowPackets: Seq[MySQLPacket] = rows.getRows.asScala
override def toRowPackets: Iterable[MySQLPacket] = rows.getRows.asScala
.zipWithIndex.map { case (tRow, i) => tRowToMySQL(tRow, colCount + 3 + i) }
private def tColDescToMySQL(

View File

@ -215,7 +215,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
batchRequest.getResource,
batchRequest.getClassName,
batchRequest.getConf.asScala.toMap,
batchRequest.getArgs.asScala,
batchRequest.getArgs.asScala.toSeq,
None,
shouldRunAsync)
openBatchSession(batchSession)
@ -240,7 +240,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
className = batchRequest.getClassName,
requestName = batchRequest.getName,
requestConf = conf,
requestArgs = batchRequest.getArgs.asScala,
requestArgs = batchRequest.getArgs.asScala.toSeq,
createTime = System.currentTimeMillis(),
engineType = batchRequest.getBatchType)

View File

@ -560,11 +560,11 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
val result = response.readEntity(new GenericType[Seq[ServerData]]() {})
assert(result.size == 1)
val testServer = result.head
val export = fe.asInstanceOf[KyuubiRestFrontendService]
val restFrontendService = fe.asInstanceOf[KyuubiRestFrontendService]
assert(namespace.equals(testServer.getNamespace.replaceFirst("/", "")))
assert(export.host.equals(testServer.getHost))
assert(export.connectionUrl.equals(testServer.getInstance()))
assert(restFrontendService.host.equals(testServer.getHost))
assert(restFrontendService.connectionUrl.equals(testServer.getInstance()))
assert(!testServer.getAttributes.isEmpty)
val attributes = testServer.getAttributes
assert(attributes.containsKey("serviceUri") &&