[KYUUBI #3908] Support to use builtin spark python lib with spark.yarn.isPython

### _Why are the changes needed?_

If master is yarn and `spark.yarn.isPython` is true, spark will submit the builtin python lib(including pyspark.zip and py4j-*.zip) by default.

Support to use builtin spark python lib with `spark.yarn.isPython`.

- try to get the py4j lib from `PYTHONPATH` and set it as the value of `PY4J_PATH` for the python process
- if `spark.yarn.isPython` is true, SPARK_HOME is not needed to set
- get the PY4J_PATH if set in `execute_python.py`

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate
<img width="1724" alt="image" src="https://user-images.githubusercontent.com/6757692/205811759-b2bc3734-96c6-492c-875c-a5c273b9f58b.png">
<img width="1336" alt="image" src="https://user-images.githubusercontent.com/6757692/205823571-87830b6c-53a0-4f5f-bad3-c28d890bcaa3.png">

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3908 from turboFei/skip_spark_home_yarn_python.

Closes #3908

4901e849d [fwang12] add comments
2d500fe72 [fwang12] fix style
86c0c9650 [fwang12] modify python

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
This commit is contained in:
fwang12 2022-12-06 14:59:38 +08:00 committed by Fu Chen
parent a0fc33c6af
commit ade2a53cc3
2 changed files with 31 additions and 15 deletions

View File

@ -28,22 +28,27 @@ from glob import glob
if sys.version_info[0] < 3:
sys.exit("Python < 3 is unsupported.")
spark_home = os.environ.get("SPARK_HOME", "")
os.environ["PYSPARK_PYTHON"] = os.environ.get("PYSPARK_PYTHON", sys.executable)
# add pyspark to sys.path
if "pyspark" not in sys.modules:
spark_python = os.path.join(spark_home, "python")
try:
py4j = glob(os.path.join(spark_python, "lib", "py4j-*.zip"))[0]
except IndexError:
raise Exception(
"Unable to find py4j in {}, your SPARK_HOME may not be configured correctly".format(
spark_python
# try to get PY4J_PATH and use it directly if not none
py4j_path = os.environ.get("PY4J_PATH")
if py4j_path is not None:
sys.path[:0] = sys_path = [py4j_path]
else:
spark_home = os.environ.get("SPARK_HOME", "")
spark_python = os.path.join(spark_home, "python")
try:
py4j = glob(os.path.join(spark_python, "lib", "py4j-*.zip"))[0]
except IndexError:
raise Exception(
"Unable to find py4j in {}, your SPARK_HOME may not be configured correctly".format(
spark_python
)
)
)
sys.path[:0] = sys_path = [spark_python, py4j]
sys.path[:0] = sys_path = [spark_python, py4j]
else:
# already imported, no need to patch sys.path
sys_path = None

View File

@ -37,7 +37,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_PYTHON_ENV_ARCHIVE, ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, ENGINE_SPARK_PYTHON_HOME_ARCHIVE}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, KYUUBI_STATEMENT_ID_KEY}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SCHEDULER_POOL_KEY
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.operation.ArrayFetchIterator
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@ -69,6 +69,7 @@ class ExecutePython(
override protected def runInternal(): Unit = withLocalProperties {
try {
info(diagnostics)
val response = worker.runCode(statement)
val output = response.map(_.content.getOutput()).getOrElse("")
val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS")
@ -152,6 +153,9 @@ case class SessionPythonWorker(
object ExecutePython extends Logging {
final val DEFAULT_SPARK_PYTHON_HOME_ARCHIVE_FRAGMENT = "__kyuubi_spark_python_home__"
final val DEFAULT_SPARK_PYTHON_ENV_ARCHIVE_FRAGMENT = "__kyuubi_spark_python_env__"
final val PY4J_REGEX = "py4j-[\\S]*.zip$".r
final val PY4J_PATH = "PY4J_PATH"
final val IS_PYTHON_APP_KEY = "spark.yarn.isPython"
private val isPythonGatewayStart = new AtomicBoolean(false)
private val kyuubiPythonPath = Utils.createTempDir()
@ -184,11 +188,18 @@ object ExecutePython extends Logging {
.split(File.pathSeparator)
.++(ExecutePython.kyuubiPythonPath.toString)
env.put("PYTHONPATH", pythonPath.mkString(File.pathSeparator))
env.put(
"SPARK_HOME",
sys.env.getOrElse(
// try to find py4j lib from `PYTHONPATH` and set env `PY4J_PATH` into process if found
pythonPath.mkString(File.pathSeparator)
.split(File.pathSeparator)
.find(PY4J_REGEX.findFirstMatchIn(_).nonEmpty)
.foreach(env.put(PY4J_PATH, _))
if (!spark.sparkContext.getConf.getBoolean(IS_PYTHON_APP_KEY, false)) {
env.put(
"SPARK_HOME",
getSparkPythonHomeFromArchive(spark, session).getOrElse(defaultSparkHome)))
sys.env.getOrElse(
"SPARK_HOME",
getSparkPythonHomeFromArchive(spark, session).getOrElse(defaultSparkHome)))
}
env.put("PYTHON_GATEWAY_CONNECTION_INFO", KyuubiPythonGatewayServer.CONNECTION_FILE_PATH)
logger.info(
s"""