diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala index d35c3fbd4..7009bf9e0 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala @@ -28,8 +28,6 @@ import javax.ws.rs.core.UriBuilder import scala.collection.JavaConverters._ -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkFiles import org.apache.spark.api.python.KyuubiPythonGatewayServer @@ -40,6 +38,7 @@ import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_PYTHON_ENV_ARCHIVE, ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, ENGINE_SPARK_PYTHON_HOME_ARCHIVE, ENGINE_SPARK_PYTHON_MAGIC_ENABLED} import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, KYUUBI_STATEMENT_ID_KEY} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ +import org.apache.kyuubi.engine.spark.util.JsonUtils import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationHandle, OperationState} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -95,7 +94,8 @@ class ExecutePython( new ArrayFetchIterator[Row](Array(Row(output, status, ename, evalue, traceback))) setState(OperationState.FINISHED) } else { - throw KyuubiSQLException(s"Interpret error:\n$statement\n $response") + throw KyuubiSQLException(s"Interpret error:\n" + + s"${JsonUtils.toPrettyJson(Map("code" -> statement, "response" -> response.orNull))}") } } } catch { @@ -200,12 +200,12 @@ case class SessionPythonWorker( throw KyuubiSQLException("Python worker process has been exited, please check the error log" + " and re-create the session to run python code.") } - val input = ExecutePython.toJson(Map("code" -> code, "cmd" -> "run_code")) + val input = JsonUtils.toJson(Map("code" -> code, "cmd" -> "run_code")) // scalastyle:off println stdin.println(input) // scalastyle:on stdin.flush() - val pythonResponse = Option(stdout.readLine()).map(ExecutePython.fromJson[PythonResponse](_)) + val pythonResponse = Option(stdout.readLine()).map(JsonUtils.fromJson[PythonResponse](_)) // throw exception if internal python code fail if (internal && !pythonResponse.map(_.content.status).contains(PythonResponse.OK_STATUS)) { throw KyuubiSQLException(s"Internal python code $code failure: $pythonResponse") @@ -214,7 +214,7 @@ case class SessionPythonWorker( } def close(): Unit = { - val exitCmd = ExecutePython.toJson(Map("cmd" -> "exit_worker")) + val exitCmd = JsonUtils.toJson(Map("cmd" -> "exit_worker")) // scalastyle:off println stdin.println(exitCmd) // scalastyle:on @@ -387,19 +387,6 @@ object ExecutePython extends Logging { sink.close() file } - - val mapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule) - def toJson[T](obj: T): String = { - mapper.writeValueAsString(obj) - } - def fromJson[T](json: String, clz: Class[T]): T = { - mapper.readValue(json, clz) - } - - def fromJson[T](json: String)(implicit m: Manifest[T]): T = { - mapper.readValue(json, m.runtimeClass).asInstanceOf[T] - } - } case class PythonResponse( @@ -424,10 +411,10 @@ case class PythonResponseContent( if (data.filterNot(_._1 == "text/plain").isEmpty) { data.get("text/plain").map { case str: String => str - case obj => ExecutePython.toJson(obj) + case obj => JsonUtils.toJson(obj) }.getOrElse("") } else { - ExecutePython.toJson(data) + JsonUtils.toJson(data) } } def getEname(): String = { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala index 691c4fb32..092e6e824 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.types.StructType import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop +import org.apache.kyuubi.engine.spark.util.JsonUtils import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationHandle, OperationState} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -119,7 +120,8 @@ class ExecuteScala( } } case Error => - throw KyuubiSQLException(s"Interpret error:\n$statement\n ${repl.getOutput}") + throw KyuubiSQLException(s"Interpret error:\n" + + s"${JsonUtils.toPrettyJson(Map("code" -> statement, "response" -> repl.getOutput))}") case Incomplete => throw KyuubiSQLException(s"Incomplete code:\n$statement") } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/JsonUtils.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/JsonUtils.scala new file mode 100644 index 000000000..192c6dbb4 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/JsonUtils.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.util + +import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +object JsonUtils { + val mapper: ObjectMapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .registerModule(DefaultScalaModule) + + def toJson[T](obj: T): String = { + mapper.writeValueAsString(obj) + } + + def toPrettyJson[T](obj: T): String = { + mapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj) + } + + def fromJson[T](json: String, clz: Class[T]): T = { + mapper.readValue(json, clz) + } + + def fromJson[T](json: String)(implicit m: Manifest[T]): T = { + mapper.readValue(json, m.runtimeClass).asInstanceOf[T] + } + + def readTree(content: String): JsonNode = { + mapper.readTree(content) + } +}