[KYUUBI #4028] [PYSPARK] Fix internal python code issue
### _Why are the changes needed?_
1. wrap the code with correct delimiter
Before:
```
{"code":"spark.sparkContext.setJobGroup(07753dd9-804e-478f-b84f-bf0735732334, ..., true)","cmd":"run_code"}
```
After:
```
{"code":"spark.sparkContext.setJobGroup('07753dd9-804e-478f-b84f-bf0735732334', '...', True)","cmd":"run_code"}
```
2. using cancelJobGroup for pyspark
Before:
```
'SparkContext' object has no attribute 'clearJobGroup'
```
After:
Using SparkContext.cancelJobGroup
3. Simplify the internal python code and throw exception on failure
We can not trust the user code is formatted correctly and we shall ensure the internal python code is simple and correct to prevent code correctness and even cause result out of sequence.
Such as, the user code might be below(maybe user invoke executePython api)
```
spark.sql('123\'\n\b\t'
```
It is difficult to escape the user code and set the job description as the statement as.
So, in this pr, I simplify the job description, just record its statementId, user can check the original code from log or on UI I think.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4028 from turboFei/python_async_debug.
Closes #4028
51a4c5ea5 [fwang12] typo
6da88d1b3 [fwang12] code style
83f5a48f7 [fwang12] fail the internal python code
5f2db042c [fwang12] remove debug code
3a798cf6c [fwang12] Simplify the statement
c3b4640ca [fwang12] do not lock for close
009f66aaa [fwang12] add ReentrantLock for SessionPythonWorker run python code
39bd861a1 [fwang12] fix
4116dabbc [fwang12] job desc
f16c656fc [fwang12] escape
81db20ccb [fwang12] fix 'SparkContext' object has no attribute 'clearJobGroup'
985118e92 [fwang12] escape for python
f7250c114 [fwang12] revert withLocalProperties
13228f964 [fwang12] debug
e318c698a [fwang12] Revert "prevent timeout"
f81c605e0 [fwang12] prevent timeout
2ca5339e3 [fwang12] test
1390b0f21 [fwang12] remove not needed
26ee60275 [fwang12] remove not needed
93c08ff08 [fwang12] debug
Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
parent
a0a52befbe
commit
79091332ce
@ -23,6 +23,7 @@ import java.net.URI
|
||||
import java.nio.file.{Files, Path, Paths}
|
||||
import java.util.concurrent.RejectedExecutionException
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import javax.ws.rs.core.UriBuilder
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
@ -99,7 +100,7 @@ class ExecutePython(
|
||||
}
|
||||
}
|
||||
|
||||
override protected def runInternal(): Unit = withLocalProperties {
|
||||
override protected def runInternal(): Unit = {
|
||||
addTimeoutMonitor(queryTimeout)
|
||||
if (shouldRunAsync) {
|
||||
val asyncOperation = new Runnable {
|
||||
@ -129,14 +130,20 @@ class ExecutePython(
|
||||
override def setSparkLocalProperty: (String, String) => Unit =
|
||||
(key: String, value: String) => {
|
||||
val valueStr = if (value == null) "None" else s"'$value'"
|
||||
worker.runCode(s"spark.sparkContext.setLocalProperty('$key', $valueStr)")
|
||||
worker.runCode(s"spark.sparkContext.setLocalProperty('$key', $valueStr)", internal = true)
|
||||
()
|
||||
}
|
||||
|
||||
override protected def withLocalProperties[T](f: => T): T = {
|
||||
try {
|
||||
worker.runCode("spark.sparkContext.setJobGroup" +
|
||||
s"($statementId, $redactedStatement, $forceCancel)")
|
||||
// to prevent the transferred set job group python code broken
|
||||
val jobDesc = s"Python statement: $statementId"
|
||||
// for python, the boolean value is capitalized
|
||||
val pythonForceCancel = if (forceCancel) "True" else "False"
|
||||
worker.runCode(
|
||||
"spark.sparkContext.setJobGroup" +
|
||||
s"('$statementId', '$jobDesc', $pythonForceCancel)",
|
||||
internal = true)
|
||||
setSparkLocalProperty(KYUUBI_SESSION_USER_KEY, session.user)
|
||||
setSparkLocalProperty(KYUUBI_STATEMENT_ID_KEY, statementId)
|
||||
schedulerPool match {
|
||||
@ -153,7 +160,8 @@ class ExecutePython(
|
||||
setSparkLocalProperty(KYUUBI_SESSION_USER_KEY, "")
|
||||
setSparkLocalProperty(KYUUBI_STATEMENT_ID_KEY, "")
|
||||
setSparkLocalProperty(SPARK_SCHEDULER_POOL_KEY, "")
|
||||
worker.runCode("spark.sparkContext.clearJobGroup()")
|
||||
// using cancelJobGroup for pyspark, see details in pyspark/context.py
|
||||
worker.runCode(s"spark.sparkContext.cancelJobGroup('$statementId')", internal = true)
|
||||
if (isSessionUserSignEnabled) {
|
||||
clearSessionUserSign()
|
||||
}
|
||||
@ -168,15 +176,38 @@ case class SessionPythonWorker(
|
||||
private val stdin: PrintWriter = new PrintWriter(workerProcess.getOutputStream)
|
||||
private val stdout: BufferedReader =
|
||||
new BufferedReader(new InputStreamReader(workerProcess.getInputStream), 1)
|
||||
private val lock = new ReentrantLock()
|
||||
|
||||
def runCode(code: String): Option[PythonResponse] = {
|
||||
private def withLockRequired[T](block: => T): T = {
|
||||
try {
|
||||
lock.lock()
|
||||
block
|
||||
} finally lock.unlock()
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the python code and return the response. This method maybe invoked internally,
|
||||
* such as setJobGroup and cancelJobGroup, if the internal python code is not formatted correctly,
|
||||
* it might impact the correctness and even cause result out of sequence. To prevent that,
|
||||
* please make sure the internal python code simple and set internal flag, to be aware of the
|
||||
* internal python code failure.
|
||||
*
|
||||
* @param code the python code
|
||||
* @param internal whether is internal python code
|
||||
* @return the python response
|
||||
*/
|
||||
def runCode(code: String, internal: Boolean = false): Option[PythonResponse] = withLockRequired {
|
||||
val input = ExecutePython.toJson(Map("code" -> code, "cmd" -> "run_code"))
|
||||
// scalastyle:off println
|
||||
stdin.println(input)
|
||||
// scalastyle:on
|
||||
stdin.flush()
|
||||
Option(stdout.readLine())
|
||||
.map(ExecutePython.fromJson[PythonResponse](_))
|
||||
val pythonResponse = Option(stdout.readLine()).map(ExecutePython.fromJson[PythonResponse](_))
|
||||
// throw exception if internal python code fail
|
||||
if (internal && pythonResponse.map(_.content.status) != Some(PythonResponse.OK_STATUS)) {
|
||||
throw KyuubiSQLException(s"Internal python code $code failure: $pythonResponse")
|
||||
}
|
||||
pythonResponse
|
||||
}
|
||||
|
||||
def close(): Unit = {
|
||||
|
||||
@ -72,11 +72,7 @@ class PySparkTests extends WithSparkSQLEngine with HiveJDBCTestHelper {
|
||||
val statement = connection.createStatement().asInstanceOf[KyuubiStatement]
|
||||
statement.setQueryTimeout(5)
|
||||
try {
|
||||
var code =
|
||||
"""
|
||||
|import time
|
||||
|time.sleep(10)
|
||||
|""".stripMargin
|
||||
var code = "spark.sql(\"select java_method('java.lang.Thread', 'sleep', 10000L)\").show()"
|
||||
var e = intercept[SQLTimeoutException] {
|
||||
statement.executePython(code)
|
||||
}.getMessage
|
||||
|
||||
Loading…
Reference in New Issue
Block a user