[KYUUBI #3900] Fallback to SQL language mode if fail to init python environment and fix log

### _Why are the changes needed?_

Now, if failed to init python environment, we have to re-create the connection.

In this pr, it will fallback to SQL language mode for above case.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate
<img width="1522" alt="image" src="https://user-images.githubusercontent.com/6757692/205556516-f8b87c3e-34ae-441f-ae42-ba4e3df9e0a4.png">

<img width="1416" alt="image" src="https://user-images.githubusercontent.com/6757692/205554712-0f399d7a-cab8-4fcb-a0f7-4f978bfb3d19.png">

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3900 from turboFei/python_log_follow.

Closes #3900

d0cf7c2fe [fwang12] provide more info
63bca0b07 [fwang12] using Utils.createTempDir
f669e78d7 [fwang12] fall back to sql

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
fwang12 2022-12-05 15:04:55 +08:00
parent b4b579d48e
commit 18d9885a3e
3 changed files with 35 additions and 16 deletions

View File

@ -34,7 +34,7 @@ import org.apache.spark.api.python.KyuubiPythonGatewayServer
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.Logging
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_PYTHON_ENV_ARCHIVE, ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, ENGINE_SPARK_PYTHON_HOME_ARCHIVE}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, KYUUBI_STATEMENT_ID_KEY}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SCHEDULER_POOL_KEY
@ -62,15 +62,24 @@ class ExecutePython(
}
}
override protected def beforeRun(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)
super.beforeRun()
}
override protected def runInternal(): Unit = withLocalProperties {
val response = worker.runCode(statement)
val output = response.map(_.content.getOutput()).getOrElse("")
val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS")
val ename = response.map(_.content.getEname()).getOrElse("")
val evalue = response.map(_.content.getEvalue()).getOrElse("")
val traceback = response.map(_.content.getTraceback()).getOrElse(Array.empty)
iter =
new ArrayFetchIterator[Row](Array(Row(output, status, ename, evalue, Row(traceback: _*))))
try {
val response = worker.runCode(statement)
val output = response.map(_.content.getOutput()).getOrElse("")
val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS")
val ename = response.map(_.content.getEname()).getOrElse("")
val evalue = response.map(_.content.getEvalue()).getOrElse("")
val traceback = response.map(_.content.getTraceback()).getOrElse(Array.empty)
iter =
new ArrayFetchIterator[Row](Array(Row(output, status, ename, evalue, Row(traceback: _*))))
} catch {
onError(cancel = true)
}
}
override def setSparkLocalProperty: (String, String) => Unit =
@ -145,7 +154,7 @@ object ExecutePython extends Logging {
final val DEFAULT_SPARK_PYTHON_ENV_ARCHIVE_FRAGMENT = "__kyuubi_spark_python_env__"
private val isPythonGatewayStart = new AtomicBoolean(false)
private val kyuubiPythonPath = Files.createTempDirectory("")
private val kyuubiPythonPath = Utils.createTempDir()
def init(): Unit = {
if (!isPythonGatewayStart.get()) {
synchronized {

View File

@ -90,11 +90,19 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
val repl = sessionToRepl.getOrElseUpdate(session.handle, KyuubiSparkILoop(spark))
new ExecuteScala(session, repl, statement)
case OperationLanguages.PYTHON =>
ExecutePython.init()
val worker = sessionToPythonProcess.getOrElseUpdate(
session.handle,
ExecutePython.createSessionPythonWorker(spark, session))
new ExecutePython(session, statement, worker)
try {
ExecutePython.init()
val worker = sessionToPythonProcess.getOrElseUpdate(
session.handle,
ExecutePython.createSessionPythonWorker(spark, session))
new ExecutePython(session, statement, worker)
} catch {
case e: Throwable =>
spark.conf.set(OPERATION_LANGUAGE.key, OperationLanguages.SQL.toString)
throw KyuubiSQLException(
s"Failed to init python environment, fall back to SQL mode: ${e.getMessage}",
e)
}
case OperationLanguages.UNKNOWN =>
spark.conf.unset(OPERATION_LANGUAGE.key)
throw KyuubiSQLException(s"The operation language $lang" +

View File

@ -24,9 +24,11 @@ import java.nio.file.Files
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.kyuubi.Utils
object KyuubiPythonGatewayServer extends Logging {
val CONNECTION_FILE_PATH = Files.createTempDirectory("") + "/connection.info"
val CONNECTION_FILE_PATH = Utils.createTempDir() + "/connection.info"
def start(): Unit = {