### _Why are the changes needed?_ close #3406. Add PySpark client docs. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3407 from bowenliang123/3406-pyspark-docs. Closes #3406 a181a5ba [Bowen Liang] nit fb0cfcdf [Bowen Liang] nit 378ca025 [Bowen Liang] nit 70b007a8 [Bowen Liang] update docs of including jdbc jars 170a3b4b [liangbowen] nit ebb56e14 [liangbowen] add pyspark link to python page 76bef457 [liangbowen] add start shell docs for adding jars 65d8dbf9 [liangbowen] add docs c55d3ae2 [Bowen Liang] update jdbc usage sample 692197e0 [Bowen Liang] init pyspark client docs Lead-authored-by: Bowen Liang <liangbowen@gf.com.cn> Co-authored-by: liangbowen <liangbowen@gf.com.cn> Signed-off-by: Cheng Pan <chengpan@apache.org>
4.8 KiB
PySpark
PySpark is an interface for Apache Spark in Python. Kyuubi can be used as JDBC source in PySpark.
Requirements
PySpark works with Python 3.7 and above.
Install PySpark with Spark SQL and optional pandas on Spark using PyPI as follows:
pip install pyspark 'pyspark[sql]' 'pyspark[pandas_on_spark]'
For installation using Conda or manually downloading, please refer to PySpark installation.
Preperation
Prepare JDBC driver
Prepare JDBC driver jar file. Supported Hive compatible JDBC Driver as below:
| Driver | Driver Class Name | Remarks |
|---|---|---|
| Kyuubi Hive Driver (doc) | org.apache.kyuubi.jdbc.KyuubiHiveDriver | Compile for the driver on master branch, as KYUUBI #3484 required by Spark JDBC source not yet included in released version. |
| Hive Driver (doc) | org.apache.hive.jdbc.HiveDriver |
Refer to docs of the driver and prepare the JDBC driver jar file.
Prepare JDBC Hive Dialect extension
Hive Dialect support is requried by Spark for wraping SQL correctly and sending to JDBC driver. Kyuubi provides a JDBC dialect extension with auto regiested Hive Daliect support for Spark. Follow the instrunctions in Hive Dialect Support to prepare the plugin jar file kyuubi-extension-spark-jdbc-dialect_-*.jar.
Including jars of JDBC driver and Hive Dialect extention
Choose one of following ways to include jar files to Spark.
-
Put the jar file of JDBC driver and Hive Dialect to
$SPARK_HOME/jarsdirectory to make it visible for the classpath of PySpark. And addingspark.sql.extensions = org.apache.spark.sql.dialect.KyuubiSparkJdbcDialectExtensionto$SPARK_HOME/conf/spark_defaults.conf. -
With spark's start shell, include JDBC driver when you submit the application with
--packages, and the Hive Dialect plugins with--jars
$SPARK_HOME/bin/pyspark --py-files PY_FILES \
--packages org.apache.hive:hive-jdbc:x.y.z \
--jars /path/kyuubi-extension-spark-jdbc-dialect_-*.jar
- Setting jars and config with SparkSession builder
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.jars", "/path/hive-jdbc-x.y.z.jar,/path/kyuubi-extension-spark-jdbc-dialect_-*.jar") \
.config("spark.sql.extensions", "org.apache.spark.sql.dialect.KyuubiSparkJdbcDialectExtension") \
.getOrCreate()
Usage
For further information about PySpark JDBC usage and options, please refer to Spark's JDBC To Other Databases.
Reading and Writing via JDBC data source
# Loading data from Kyuubi via HiveDriver as JDBC source
jdbcDF = spark.read \
.format("jdbc") \
.options(driver="org.apache.hive.jdbc.HiveDriver",
url="jdbc:hive2://kyuubi_server_ip:port",
user="user",
password="password",
query="select * from testdb.src_table"
) \
.load()
# Saving data to Kyuubi via HiveDriver as JDBC source
jdbcDF.write \
.format("jdbc") \
.options(driver="org.apache.hive.jdbc.HiveDriver",
url="jdbc:hive2://kyuubi_server_ip:port",
user="user",
password="password",
dbtable="testdb.tgt_table"
) \
.save()
Use PySpark with Pandas
From PySpark 3.2.0, PySpark supports pandas API on Spark which allows you to scale your pandas workload out.
Pandas-on-Spark DataFrame and Spark DataFrame are virtually interchangeable. More instructions in From/to pandas and PySpark DataFrames.
import pyspark.pandas as ps
psdf = ps.range(10)
sdf = psdf.to_spark().filter("id > 5")
sdf.show()