diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala index c5f25a834..27090fae4 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.engine.spark.repl import java.io.{ByteArrayOutputStream, File} +import java.util.concurrent.locks.ReentrantLock import scala.tools.nsc.Settings import scala.tools.nsc.interpreter.IR @@ -32,10 +33,11 @@ private[spark] case class KyuubiSparkILoop private ( spark: SparkSession, output: ByteArrayOutputStream) extends SparkILoop(None, new JPrintWriter(output)) { + import KyuubiSparkILoop._ val result = new DataFrameHolder(spark) - private def initialize(): Unit = { + private def initialize(): Unit = withLockRequired { settings = new Settings val interpArguments = List( "-Yrepl-class-based", @@ -98,7 +100,7 @@ private[spark] case class KyuubiSparkILoop private ( def clearResult(statementId: String): Unit = result.unset(statementId) - def interpretWithRedirectOutError(statement: String): IR.Result = { + def interpretWithRedirectOutError(statement: String): IR.Result = withLockRequired { Console.withOut(output) { Console.withErr(output) { this.interpret(statement) @@ -120,4 +122,12 @@ private[spark] object KyuubiSparkILoop { iLoop.initialize() iLoop } + + private val lock = new ReentrantLock() + private def withLockRequired[T](block: => T): T = { + try { + lock.lock() + block + } finally lock.unlock() + } }