[KYUUBI #5877][FOLLOWUP] Add spark output mode to better support PySpark notebook

# 🔍 Description
## Issue References 🔗

This pull request fixes #

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklists
## 📝 Author Self Checklist

- [ ] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [ ] I have performed a self-review
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [ ] Pull request title is okay.
- [ ] No license issues.
- [ ] Milestone correctly set?
- [ ] Test coverage is ok
- [ ] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested

**Be nice. Be informative.**

Closes #5898 from turboFei/notebook_mode.

Closes #5877

7f1c607b9 [Fei Wang] PySpark
644d036bc [Fei Wang] docs
7c68b7742 [Fei Wang] add option to support notebook well

Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Fei Wang <fwang12@ebay.com>
This commit is contained in:
Fei Wang 2023-12-21 21:26:35 -08:00
parent 42b08ba840
commit 58db6ca225
4 changed files with 35 additions and 12 deletions

View File

@ -181,6 +181,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.single.spark.session | false | When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database. | boolean | 1.3.0 |
| kyuubi.engine.spark.event.loggers | SPARK | A comma-separated list of engine loggers, where engine/session/operation etc events go.<ul> <li>SPARK: the events will be written to the Spark listener bus.</li> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done.</li></ul> | seq | 1.7.0 |
| kyuubi.engine.spark.initialize.sql | SHOW DATABASES | The initialize sql for Spark engine. It fallback to `kyuubi.engine.initialize.sql`. | seq | 1.8.1 |
| kyuubi.engine.spark.output.mode | AUTO | The output mode of Spark engine: <ul> <li>AUTO: For PySpark, the extracted `text/plain` from python response as output.</li> <li>NOTEBOOK: For PySpark, the original python response as output.</li></ul> | string | 1.9.0 |
| kyuubi.engine.spark.python.env.archive | &lt;undefined&gt; | Portable Python env archive used for Spark engine Python language mode. | string | 1.7.0 |
| kyuubi.engine.spark.python.env.archive.exec.path | bin/python | The Python exec path under the Python env archive. | string | 1.7.0 |
| kyuubi.engine.spark.python.home.archive | &lt;undefined&gt; | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode. | string | 1.7.0 |

View File

@ -36,6 +36,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_PYTHON_ENV_ARCHIVE, ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, ENGINE_SPARK_PYTHON_HOME_ARCHIVE, ENGINE_SPARK_PYTHON_MAGIC_ENABLED}
import org.apache.kyuubi.config.KyuubiConf.EngineSparkOutputMode.{AUTO, EngineSparkOutputMode, NOTEBOOK}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, KYUUBI_STATEMENT_ID_KEY}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.util.JsonUtils
@ -86,7 +87,7 @@ class ExecutePython(
val response = worker.runCode(statement)
val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS")
if (PythonResponse.OK_STATUS.equalsIgnoreCase(status)) {
val output = response.map(_.content.getOutput()).getOrElse("")
val output = response.map(_.content.getOutput(outputMode)).getOrElse("")
val ename = response.map(_.content.getEname()).getOrElse("")
val evalue = response.map(_.content.getEvalue()).getOrElse("")
val traceback = response.map(_.content.getTraceback()).getOrElse(Seq.empty)
@ -403,18 +404,22 @@ case class PythonResponseContent(
evalue: String,
traceback: Seq[String],
status: String) {
def getOutput(): String = {
def getOutput(outputMode: EngineSparkOutputMode): String = {
if (data == null) return ""
// If data does not contains field other than `test/plain`, keep backward compatibility,
// otherwise, return all the data.
if (data.filterNot(_._1 == "text/plain").isEmpty) {
data.get("text/plain").map {
case str: String => str
case obj => JsonUtils.toJson(obj)
}.getOrElse("")
} else {
JsonUtils.toJson(data)
outputMode match {
case AUTO =>
// If data does not contains field other than `test/plain`, keep backward compatibility,
// otherwise, return all the data.
if (data.filterNot(_._1 == "text/plain").isEmpty) {
data.get("text/plain").map {
case str: String => str
case obj => JsonUtils.toJson(obj)
}.getOrElse("")
} else {
JsonUtils.toJson(data)
}
case NOTEBOOK => JsonUtils.toJson(data)
}
}
def getEname(): String = {

View File

@ -28,7 +28,7 @@ import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING, OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, SESSION_USER_SIGN_ENABLED}
import org.apache.kyuubi.config.KyuubiConf.{ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING, ENGINE_SPARK_OUTPUT_MODE, EngineSparkOutputMode, OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, SESSION_USER_SIGN_ENABLED}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_KEY, KYUUBI_SESSION_USER_SIGN, KYUUBI_STATEMENT_ID_KEY}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SCHEDULER_POOL_KEY}
import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
@ -82,6 +82,9 @@ abstract class SparkOperation(session: Session)
protected def supportProgress: Boolean = false
protected def outputMode: EngineSparkOutputMode.EngineSparkOutputMode =
EngineSparkOutputMode.withName(getSessionConf(ENGINE_SPARK_OUTPUT_MODE, spark))
override def getStatus: OperationStatus = {
if (progressEnable && supportProgress) {
val progressMonitor = new SparkProgressMonitor(spark, statementId)

View File

@ -3258,6 +3258,20 @@ object KyuubiConf {
.booleanConf
.createWithDefault(true)
object EngineSparkOutputMode extends Enumeration {
type EngineSparkOutputMode = Value
val AUTO, NOTEBOOK = Value
}
val ENGINE_SPARK_OUTPUT_MODE: ConfigEntry[String] =
buildConf("kyuubi.engine.spark.output.mode")
.doc("The output mode of Spark engine: <ul>" +
" <li>AUTO: For PySpark, the extracted `text/plain` from python response as output.</li>" +
" <li>NOTEBOOK: For PySpark, the original python response as output.</li></ul>")
.version("1.9.0")
.stringConf
.createWithDefault(EngineSparkOutputMode.AUTO.toString)
val ENGINE_SPARK_REGISTER_ATTRIBUTES: ConfigEntry[Seq[String]] =
buildConf("kyuubi.engine.spark.register.attributes")
.internal