[KYUUBI #4150] Support to execute Scala statement synchronized to prevent conflicts

### _Why are the changes needed?_

Support to execute Scala statement synchronized to prevent conflicts, because they share the same `spark.repl.class.outputDir`.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4150 from turboFei/lock_scala.

Closes #4150

f11f12a26 [fwang12] lock more
d7a9fe8ed [fwang12] remove conf
d4175827e [fwang12] update docs
c1524a7fc [fwang12] lock required
a6e663be7 [fwang12] lock scala

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
fwang12 2023-01-12 20:58:49 +08:00
parent 3f8189994b
commit 4669163176

View File

@ -18,6 +18,7 @@
package org.apache.kyuubi.engine.spark.repl package org.apache.kyuubi.engine.spark.repl
import java.io.{ByteArrayOutputStream, File} import java.io.{ByteArrayOutputStream, File}
import java.util.concurrent.locks.ReentrantLock
import scala.tools.nsc.Settings import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.IR import scala.tools.nsc.interpreter.IR
@ -32,10 +33,11 @@ private[spark] case class KyuubiSparkILoop private (
spark: SparkSession, spark: SparkSession,
output: ByteArrayOutputStream) output: ByteArrayOutputStream)
extends SparkILoop(None, new JPrintWriter(output)) { extends SparkILoop(None, new JPrintWriter(output)) {
import KyuubiSparkILoop._
val result = new DataFrameHolder(spark) val result = new DataFrameHolder(spark)
private def initialize(): Unit = { private def initialize(): Unit = withLockRequired {
settings = new Settings settings = new Settings
val interpArguments = List( val interpArguments = List(
"-Yrepl-class-based", "-Yrepl-class-based",
@ -98,7 +100,7 @@ private[spark] case class KyuubiSparkILoop private (
def clearResult(statementId: String): Unit = result.unset(statementId) def clearResult(statementId: String): Unit = result.unset(statementId)
def interpretWithRedirectOutError(statement: String): IR.Result = { def interpretWithRedirectOutError(statement: String): IR.Result = withLockRequired {
Console.withOut(output) { Console.withOut(output) {
Console.withErr(output) { Console.withErr(output) {
this.interpret(statement) this.interpret(statement)
@ -120,4 +122,12 @@ private[spark] object KyuubiSparkILoop {
iLoop.initialize() iLoop.initialize()
iLoop iLoop
} }
private val lock = new ReentrantLock()
private def withLockRequired[T](block: => T): T = {
try {
lock.lock()
block
} finally lock.unlock()
}
} }