diff --git a/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py b/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py index 251be544c..2d7ce4e0f 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py +++ b/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py @@ -28,22 +28,27 @@ from glob import glob if sys.version_info[0] < 3: sys.exit("Python < 3 is unsupported.") -spark_home = os.environ.get("SPARK_HOME", "") os.environ["PYSPARK_PYTHON"] = os.environ.get("PYSPARK_PYTHON", sys.executable) # add pyspark to sys.path if "pyspark" not in sys.modules: - spark_python = os.path.join(spark_home, "python") - try: - py4j = glob(os.path.join(spark_python, "lib", "py4j-*.zip"))[0] - except IndexError: - raise Exception( - "Unable to find py4j in {}, your SPARK_HOME may not be configured correctly".format( - spark_python + # try to get PY4J_PATH and use it directly if not none + py4j_path = os.environ.get("PY4J_PATH") + if py4j_path is not None: + sys.path[:0] = sys_path = [py4j_path] + else: + spark_home = os.environ.get("SPARK_HOME", "") + spark_python = os.path.join(spark_home, "python") + try: + py4j = glob(os.path.join(spark_python, "lib", "py4j-*.zip"))[0] + except IndexError: + raise Exception( + "Unable to find py4j in {}, your SPARK_HOME may not be configured correctly".format( + spark_python + ) ) - ) - sys.path[:0] = sys_path = [spark_python, py4j] + sys.path[:0] = sys_path = [spark_python, py4j] else: # already imported, no need to patch sys.path sys_path = None diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala index e42e05f03..70b349ce9 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.types.StructType import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_PYTHON_ENV_ARCHIVE, ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, ENGINE_SPARK_PYTHON_HOME_ARCHIVE} import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, KYUUBI_STATEMENT_ID_KEY} -import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SCHEDULER_POOL_KEY +import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ import org.apache.kyuubi.operation.ArrayFetchIterator import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -69,6 +69,7 @@ class ExecutePython( override protected def runInternal(): Unit = withLocalProperties { try { + info(diagnostics) val response = worker.runCode(statement) val output = response.map(_.content.getOutput()).getOrElse("") val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS") @@ -152,6 +153,9 @@ case class SessionPythonWorker( object ExecutePython extends Logging { final val DEFAULT_SPARK_PYTHON_HOME_ARCHIVE_FRAGMENT = "__kyuubi_spark_python_home__" final val DEFAULT_SPARK_PYTHON_ENV_ARCHIVE_FRAGMENT = "__kyuubi_spark_python_env__" + final val PY4J_REGEX = "py4j-[\\S]*.zip$".r + final val PY4J_PATH = "PY4J_PATH" + final val IS_PYTHON_APP_KEY = "spark.yarn.isPython" private val isPythonGatewayStart = new AtomicBoolean(false) private val kyuubiPythonPath = Utils.createTempDir() @@ -184,11 +188,18 @@ object ExecutePython extends Logging { .split(File.pathSeparator) .++(ExecutePython.kyuubiPythonPath.toString) env.put("PYTHONPATH", pythonPath.mkString(File.pathSeparator)) - env.put( - "SPARK_HOME", - sys.env.getOrElse( + // try to find py4j lib from `PYTHONPATH` and set env `PY4J_PATH` into process if found + pythonPath.mkString(File.pathSeparator) + .split(File.pathSeparator) + .find(PY4J_REGEX.findFirstMatchIn(_).nonEmpty) + .foreach(env.put(PY4J_PATH, _)) + if (!spark.sparkContext.getConf.getBoolean(IS_PYTHON_APP_KEY, false)) { + env.put( "SPARK_HOME", - getSparkPythonHomeFromArchive(spark, session).getOrElse(defaultSparkHome))) + sys.env.getOrElse( + "SPARK_HOME", + getSparkPythonHomeFromArchive(spark, session).getOrElse(defaultSparkHome))) + } env.put("PYTHON_GATEWAY_CONNECTION_INFO", KyuubiPythonGatewayServer.CONNECTION_FILE_PATH) logger.info( s"""