From 79091332ce3825cf37260a25fc14713e3d90b055 Mon Sep 17 00:00:00 2001 From: fwang12 Date: Tue, 27 Dec 2022 11:03:32 +0800 Subject: [PATCH] [KYUUBI #4028] [PYSPARK] Fix internal python code issue ### _Why are the changes needed?_ 1. wrap the code with correct delimiter Before: ``` {"code":"spark.sparkContext.setJobGroup(07753dd9-804e-478f-b84f-bf0735732334, ..., true)","cmd":"run_code"} ``` After: ``` {"code":"spark.sparkContext.setJobGroup('07753dd9-804e-478f-b84f-bf0735732334', '...', True)","cmd":"run_code"} ``` 2. using cancelJobGroup for pyspark Before: ``` 'SparkContext' object has no attribute 'clearJobGroup' ``` After: Using SparkContext.cancelJobGroup 3. Simplify the internal python code and throw exception on failure We can not trust the user code is formatted correctly and we shall ensure the internal python code is simple and correct to prevent code correctness and even cause result out of sequence. Such as, the user code might be below(maybe user invoke executePython api) ``` spark.sql('123\'\n\b\t' ``` It is difficult to escape the user code and set the job description as the statement as. So, in this pr, I simplify the job description, just record its statementId, user can check the original code from log or on UI I think. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4028 from turboFei/python_async_debug. Closes #4028 51a4c5ea5 [fwang12] typo 6da88d1b3 [fwang12] code style 83f5a48f7 [fwang12] fail the internal python code 5f2db042c [fwang12] remove debug code 3a798cf6c [fwang12] Simplify the statement c3b4640ca [fwang12] do not lock for close 009f66aaa [fwang12] add ReentrantLock for SessionPythonWorker run python code 39bd861a1 [fwang12] fix 4116dabbc [fwang12] job desc f16c656fc [fwang12] escape 81db20ccb [fwang12] fix 'SparkContext' object has no attribute 'clearJobGroup' 985118e92 [fwang12] escape for python f7250c114 [fwang12] revert withLocalProperties 13228f964 [fwang12] debug e318c698a [fwang12] Revert "prevent timeout" f81c605e0 [fwang12] prevent timeout 2ca5339e3 [fwang12] test 1390b0f21 [fwang12] remove not needed 26ee60275 [fwang12] remove not needed 93c08ff08 [fwang12] debug Authored-by: fwang12 Signed-off-by: fwang12 --- .../spark/operation/ExecutePython.scala | 47 +++++++++++++++---- .../engine/spark/operation/PySparkTests.scala | 6 +-- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala index a23a9f36f..a27a8a023 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala @@ -23,6 +23,7 @@ import java.net.URI import java.nio.file.{Files, Path, Paths} import java.util.concurrent.RejectedExecutionException import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.ReentrantLock import javax.ws.rs.core.UriBuilder import scala.collection.JavaConverters._ @@ -99,7 +100,7 @@ class ExecutePython( } } - override protected def runInternal(): Unit = withLocalProperties { + override protected def runInternal(): Unit = { addTimeoutMonitor(queryTimeout) if (shouldRunAsync) { val asyncOperation = new Runnable { @@ -129,14 +130,20 @@ class ExecutePython( override def setSparkLocalProperty: (String, String) => Unit = (key: String, value: String) => { val valueStr = if (value == null) "None" else s"'$value'" - worker.runCode(s"spark.sparkContext.setLocalProperty('$key', $valueStr)") + worker.runCode(s"spark.sparkContext.setLocalProperty('$key', $valueStr)", internal = true) () } override protected def withLocalProperties[T](f: => T): T = { try { - worker.runCode("spark.sparkContext.setJobGroup" + - s"($statementId, $redactedStatement, $forceCancel)") + // to prevent the transferred set job group python code broken + val jobDesc = s"Python statement: $statementId" + // for python, the boolean value is capitalized + val pythonForceCancel = if (forceCancel) "True" else "False" + worker.runCode( + "spark.sparkContext.setJobGroup" + + s"('$statementId', '$jobDesc', $pythonForceCancel)", + internal = true) setSparkLocalProperty(KYUUBI_SESSION_USER_KEY, session.user) setSparkLocalProperty(KYUUBI_STATEMENT_ID_KEY, statementId) schedulerPool match { @@ -153,7 +160,8 @@ class ExecutePython( setSparkLocalProperty(KYUUBI_SESSION_USER_KEY, "") setSparkLocalProperty(KYUUBI_STATEMENT_ID_KEY, "") setSparkLocalProperty(SPARK_SCHEDULER_POOL_KEY, "") - worker.runCode("spark.sparkContext.clearJobGroup()") + // using cancelJobGroup for pyspark, see details in pyspark/context.py + worker.runCode(s"spark.sparkContext.cancelJobGroup('$statementId')", internal = true) if (isSessionUserSignEnabled) { clearSessionUserSign() } @@ -168,15 +176,38 @@ case class SessionPythonWorker( private val stdin: PrintWriter = new PrintWriter(workerProcess.getOutputStream) private val stdout: BufferedReader = new BufferedReader(new InputStreamReader(workerProcess.getInputStream), 1) + private val lock = new ReentrantLock() - def runCode(code: String): Option[PythonResponse] = { + private def withLockRequired[T](block: => T): T = { + try { + lock.lock() + block + } finally lock.unlock() + } + + /** + * Run the python code and return the response. This method maybe invoked internally, + * such as setJobGroup and cancelJobGroup, if the internal python code is not formatted correctly, + * it might impact the correctness and even cause result out of sequence. To prevent that, + * please make sure the internal python code simple and set internal flag, to be aware of the + * internal python code failure. + * + * @param code the python code + * @param internal whether is internal python code + * @return the python response + */ + def runCode(code: String, internal: Boolean = false): Option[PythonResponse] = withLockRequired { val input = ExecutePython.toJson(Map("code" -> code, "cmd" -> "run_code")) // scalastyle:off println stdin.println(input) // scalastyle:on stdin.flush() - Option(stdout.readLine()) - .map(ExecutePython.fromJson[PythonResponse](_)) + val pythonResponse = Option(stdout.readLine()).map(ExecutePython.fromJson[PythonResponse](_)) + // throw exception if internal python code fail + if (internal && pythonResponse.map(_.content.status) != Some(PythonResponse.OK_STATUS)) { + throw KyuubiSQLException(s"Internal python code $code failure: $pythonResponse") + } + pythonResponse } def close(): Unit = { diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/PySparkTests.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/PySparkTests.scala index d47c64fb0..e2dd2609d 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/PySparkTests.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/PySparkTests.scala @@ -72,11 +72,7 @@ class PySparkTests extends WithSparkSQLEngine with HiveJDBCTestHelper { val statement = connection.createStatement().asInstanceOf[KyuubiStatement] statement.setQueryTimeout(5) try { - var code = - """ - |import time - |time.sleep(10) - |""".stripMargin + var code = "spark.sql(\"select java_method('java.lang.Thread', 'sleep', 10000L)\").show()" var e = intercept[SQLTimeoutException] { statement.executePython(code) }.getMessage