From a5a3e201a79f5d7aeb684a51d452bf47bc7217a3 Mon Sep 17 00:00:00 2001 From: fwang12 Date: Tue, 27 Dec 2022 15:01:36 +0800 Subject: [PATCH] [KYUUBI #4026] [PYSPARK] Fail if the session python worker process has been exited MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### _Why are the changes needed?_ Before, if the pyspark environment is not set up correctly,the python response was always `None`. In this pr, fail if the session python worker process has been exited. BTW: Filter the empty log. image ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4026 from turboFei/python_exec. Closes #4026 499e19b54 [fwang12] more insights 17cefc02e [fwang12] Fail if the session python worker has been exited Authored-by: fwang12 Signed-off-by: fwang12 --- .../kyuubi/engine/spark/operation/ExecutePython.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala index a27a8a023..06d1e19a2 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala @@ -197,6 +197,10 @@ case class SessionPythonWorker( * @return the python response */ def runCode(code: String, internal: Boolean = false): Option[PythonResponse] = withLockRequired { + if (!workerProcess.isAlive) { + throw KyuubiSQLException("Python worker process has been exited, please check the error log" + + " and re-create the session to run python code.") + } val input = ExecutePython.toJson(Map("code" -> code, "cmd" -> "run_code")) // scalastyle:off println stdin.println(input) @@ -337,7 +341,7 @@ object ExecutePython extends Logging { val stderrThread = new Thread("process stderr thread") { override def run(): Unit = { val lines = scala.io.Source.fromInputStream(process.getErrorStream).getLines() - lines.foreach(logger.error) + lines.filter(_.trim.nonEmpty).foreach(logger.error) } } stderrThread.setDaemon(true)