From 58db6ca2256d9d018b3fa58bc894fcd281b2c443 Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Thu, 21 Dec 2023 21:26:35 -0800 Subject: [PATCH] [KYUUBI #5877][FOLLOWUP] Add spark output mode to better support PySpark notebook MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— This pull request fixes # ## Describe Your Solution ๐Ÿ”ง Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklists ## ๐Ÿ“ Author Self Checklist - [ ] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [ ] I have performed a self-review - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## ๐Ÿ“ Committer Pre-Merge Checklist - [ ] Pull request title is okay. - [ ] No license issues. - [ ] Milestone correctly set? - [ ] Test coverage is ok - [ ] Assignees are selected. - [ ] Minimum number of approvals - [ ] No changes are requested **Be nice. Be informative.** Closes #5898 from turboFei/notebook_mode. Closes #5877 7f1c607b9 [Fei Wang] PySpark 644d036bc [Fei Wang] docs 7c68b7742 [Fei Wang] add option to support notebook well Authored-by: Fei Wang Signed-off-by: Fei Wang --- docs/configuration/settings.md | 1 + .../spark/operation/ExecutePython.scala | 27 +++++++++++-------- .../spark/operation/SparkOperation.scala | 5 +++- .../org/apache/kyuubi/config/KyuubiConf.scala | 14 ++++++++++ 4 files changed, 35 insertions(+), 12 deletions(-) diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index b42bdcf10..07ae62771 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -181,6 +181,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.engine.single.spark.session | false | When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database. | boolean | 1.3.0 | | kyuubi.engine.spark.event.loggers | SPARK | A comma-separated list of engine loggers, where engine/session/operation etc events go.
  • SPARK: the events will be written to the Spark listener bus.
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
| seq | 1.7.0 | | kyuubi.engine.spark.initialize.sql | SHOW DATABASES | The initialize sql for Spark engine. It fallback to `kyuubi.engine.initialize.sql`. | seq | 1.8.1 | +| kyuubi.engine.spark.output.mode | AUTO | The output mode of Spark engine:
  • AUTO: For PySpark, the extracted `text/plain` from python response as output.
  • NOTEBOOK: For PySpark, the original python response as output.
| string | 1.9.0 | | kyuubi.engine.spark.python.env.archive | <undefined> | Portable Python env archive used for Spark engine Python language mode. | string | 1.7.0 | | kyuubi.engine.spark.python.env.archive.exec.path | bin/python | The Python exec path under the Python env archive. | string | 1.7.0 | | kyuubi.engine.spark.python.home.archive | <undefined> | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode. | string | 1.7.0 | diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala index 7009bf9e0..f60b1d4c8 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.types.StructType import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_PYTHON_ENV_ARCHIVE, ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, ENGINE_SPARK_PYTHON_HOME_ARCHIVE, ENGINE_SPARK_PYTHON_MAGIC_ENABLED} +import org.apache.kyuubi.config.KyuubiConf.EngineSparkOutputMode.{AUTO, EngineSparkOutputMode, NOTEBOOK} import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, KYUUBI_STATEMENT_ID_KEY} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ import org.apache.kyuubi.engine.spark.util.JsonUtils @@ -86,7 +87,7 @@ class ExecutePython( val response = worker.runCode(statement) val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS") if (PythonResponse.OK_STATUS.equalsIgnoreCase(status)) { - val output = response.map(_.content.getOutput()).getOrElse("") + val output = response.map(_.content.getOutput(outputMode)).getOrElse("") val ename = response.map(_.content.getEname()).getOrElse("") val evalue = response.map(_.content.getEvalue()).getOrElse("") val traceback = response.map(_.content.getTraceback()).getOrElse(Seq.empty) @@ -403,18 +404,22 @@ case class PythonResponseContent( evalue: String, traceback: Seq[String], status: String) { - def getOutput(): String = { + def getOutput(outputMode: EngineSparkOutputMode): String = { if (data == null) return "" - // If data does not contains field other than `test/plain`, keep backward compatibility, - // otherwise, return all the data. - if (data.filterNot(_._1 == "text/plain").isEmpty) { - data.get("text/plain").map { - case str: String => str - case obj => JsonUtils.toJson(obj) - }.getOrElse("") - } else { - JsonUtils.toJson(data) + outputMode match { + case AUTO => + // If data does not contains field other than `test/plain`, keep backward compatibility, + // otherwise, return all the data. + if (data.filterNot(_._1 == "text/plain").isEmpty) { + data.get("text/plain").map { + case str: String => str + case obj => JsonUtils.toJson(obj) + }.getOrElse("") + } else { + JsonUtils.toJson(data) + } + case NOTEBOOK => JsonUtils.toJson(data) } } def getEname(): String = { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 1d271cfce..88ebc306b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.{BinaryType, StructField, StructType} import org.apache.kyuubi.{KyuubiSQLException, Utils} import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.config.KyuubiConf.{ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING, OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, SESSION_USER_SIGN_ENABLED} +import org.apache.kyuubi.config.KyuubiConf.{ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING, ENGINE_SPARK_OUTPUT_MODE, EngineSparkOutputMode, OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, SESSION_USER_SIGN_ENABLED} import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_KEY, KYUUBI_SESSION_USER_SIGN, KYUUBI_STATEMENT_ID_KEY} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SCHEDULER_POOL_KEY} import org.apache.kyuubi.engine.spark.events.SparkOperationEvent @@ -82,6 +82,9 @@ abstract class SparkOperation(session: Session) protected def supportProgress: Boolean = false + protected def outputMode: EngineSparkOutputMode.EngineSparkOutputMode = + EngineSparkOutputMode.withName(getSessionConf(ENGINE_SPARK_OUTPUT_MODE, spark)) + override def getStatus: OperationStatus = { if (progressEnable && supportProgress) { val progressMonitor = new SparkProgressMonitor(spark, statementId) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index fd01e718c..f9bca31ce 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -3258,6 +3258,20 @@ object KyuubiConf { .booleanConf .createWithDefault(true) + object EngineSparkOutputMode extends Enumeration { + type EngineSparkOutputMode = Value + val AUTO, NOTEBOOK = Value + } + + val ENGINE_SPARK_OUTPUT_MODE: ConfigEntry[String] = + buildConf("kyuubi.engine.spark.output.mode") + .doc("The output mode of Spark engine:
    " + + "
  • AUTO: For PySpark, the extracted `text/plain` from python response as output.
  • " + + "
  • NOTEBOOK: For PySpark, the original python response as output.
") + .version("1.9.0") + .stringConf + .createWithDefault(EngineSparkOutputMode.AUTO.toString) + val ENGINE_SPARK_REGISTER_ATTRIBUTES: ConfigEntry[Seq[String]] = buildConf("kyuubi.engine.spark.register.attributes") .internal