From 0ecf8fbc7eb2238554854c489f15bacb8a9bb73f Mon Sep 17 00:00:00 2001 From: hzxiongyinke <1062376716@qq.com> Date: Wed, 29 Sep 2021 17:51:37 +0800 Subject: [PATCH] [KYUUBI #939] z-order performance_test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What is the purpose of the pull request pr for KYUUBI #939:Add Z-Order extensions to optimize table with zorder.Z-order is a technique that allows you to map multidimensional data to a single dimension. We did a performance test for this test ,we used aliyun Databricks Delta test case https://help.aliyun.com/document_detail/168137.html?spm=a2c4g.11186623.6.563.10d758ccclYtVb Prepare data for the three scenarios: 1. 10 billion data and 2 hundred files(parquet files): for big file(1G) 2. 10 billion data and 1 thousand files(parquet files): for medium file(200m) 3. one billion data and 10 hundred files(parquet files): for smaller file(200k) test env: spark-3.1.2 hadoop-2.7.2 kyubbi-1.4.0 test step: Step1: create hive tables ```scala spark.sql(s"drop database if exists $dbName cascade") spark.sql(s"create database if not exists $dbName") spark.sql(s"use $dbName") spark.sql(s"create table $connRandomParquet (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet") spark.sql(s"create table $connZorderOnlyIp (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet") spark.sql(s"create table $connZorder (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet") spark.sql(s"show tables").show(false) ``` Step2: prepare data for parquet table with three scenarios we use the following code ```scala def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".") def randomPort(r: Random) = r.nextInt(65536) def randomConnRecord(r: Random) = ConnRecord( src_ip = randomIPv4(r), src_port = randomPort(r), dst_ip = randomIPv4(r), dst_port = randomPort(r)) ``` Step3: do optimize with z-order only ip, sort column: src_ip, dst_ip and shuffle partition just as file numbers . execute 'OPTIMIZE conn_zorder_only_ip ZORDER BY src_ip, dst_ip;' by kyuubi. Step4: do optimize with z-order only ip, sort column: src_ip, dst_ip and shuffle partition just as file numbers . execute 'OPTIMIZE conn_zorder ZORDER BY src_ip, src_port, dst_ip, dst_port;' by kyuubi. --------------------- # benchmark result by querying the tables before and after optimization, we find that **10 billion data and 200 files and Query resource:200 core 600G memory** | Table | Average File Size | Scan row count | Average query time | row count Skipping ratio | | ------------------- | ----------------- | -------------- | ------------------ | ------------------------ | | conn_random_parquet | 1.2 G | 10,000,000,000 | 27.554 s | 0.0% | | conn_zorder_only_ip | 890 M | 43,170,600 | 2.459 s | 99.568% | | conn_zorder | 890 M | 54,841,302 | 3.185 s | 99.451% | **10 billion data and 2000 files and Query resource:200 core 600G memory** | Table | Average File Size | Scan row count | Average query time | row count Skipping ratio | | ------------------- | ----------------- | -------------- | ------------------ | ------------------------ | | conn_random_parquet | 234.8 M | 10,000,000,000 | 27.031 s | 0.0% | | conn_zorder_only_ip | 173.9 M | 43,170,600 | 2.668 s | 99.568% | | conn_zorder | 174.0 M | 54,841,302 | 3.207 s | 99.451% | **1 billion data and 10000 files and Query resource:10 core 40G memory** | Table | Average File Size | Scan row count | Average query time | row count Skipping ratio | | ------------------- | ----------------- | -------------- | ------------------ | ------------------------ | | conn_random_parquet | 2.7 M | 1,000,000,000 | 76.772 s | 0.0% | | conn_zorder_only_ip | 2.1 M | 406,572 | 3.963 s | 99.959% | | conn_zorder | 2.2 M | 387,942 | 3.621s | 99.961% | Closes #1178 from hzxiongyinke/zorder_performance_test. Closes #939 369a9b41 [hzxiongyinke] remove set spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension; 8c8ae458 [hzxiongyinke] add index z-order-benchmark 66bd20fd [hzxiongyinke] change tables to three scenarios cc80f4e7 [hzxiongyinke] add License 70c29daa [hzxiongyinke] z-order performance_test 6f1892be [hzxiongyinke] Merge pull request #1 from apache/master Lead-authored-by: hzxiongyinke <1062376716@qq.com> Co-authored-by: hzxiongyinke <75288351+hzxiongyinke@users.noreply.github.com> Signed-off-by: ulysses-you --- docs/sql/index.rst | 1 + docs/sql/z-order-benchmark.md | 210 ++++++++++++++++++++++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 docs/sql/z-order-benchmark.md diff --git a/docs/sql/index.rst b/docs/sql/index.rst index 70b45aaf2..b8c894bf2 100644 --- a/docs/sql/index.rst +++ b/docs/sql/index.rst @@ -27,4 +27,5 @@ This part describes the use of the SQL References in Kyuubi, including lists of rules functions + z-order-benchmark diff --git a/docs/sql/z-order-benchmark.md b/docs/sql/z-order-benchmark.md new file mode 100644 index 000000000..968b51811 --- /dev/null +++ b/docs/sql/z-order-benchmark.md @@ -0,0 +1,210 @@ + + + + +
+ +![](../imgs/kyuubi_logo.png) + +
+ +# z-order benchmark + +Z-order is a technique that allows you to map multidimensional data to a single dimension. We did a performance test + +for this test ,we used aliyun Databricks Delta test case +https://help.aliyun.com/document_detail/168137.html?spm=a2c4g.11186623.6.563.10d758ccclYtVb + +Prepare data for the three scenarios: + +1. 10 billion data and 2 hundred files(parquet files): for big file(1G) +2. 10 billion data and 1 thousand files(parquet files): for medium file(200m) +3. one billion data and 10 hundred files(parquet files): for smaller file(200k) + +test env: +spark-3.1.2 +hadoop-2.7.2 +kyubbi-1.4.0 + +test step: + +Step1: create hive tables + +```scala +spark.sql(s"drop database if exists $dbName cascade") +spark.sql(s"create database if not exists $dbName") +spark.sql(s"use $dbName") +spark.sql(s"create table $connRandomParquet (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet") +spark.sql(s"create table $connZorderOnlyIp (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet") +spark.sql(s"create table $connZorder (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet") +spark.sql(s"show tables").show(false) +``` + +Step2: prepare data for parquet table with three scenarios +we use the following code + +```scala +def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".") +def randomPort(r: Random) = r.nextInt(65536) + +def randomConnRecord(r: Random) = ConnRecord( + src_ip = randomIPv4(r), src_port = randomPort(r), + dst_ip = randomIPv4(r), dst_port = randomPort(r)) +``` + +Step3: do optimize with z-order only ip, sort column: src_ip, dst_ip and shuffle partition just as file numbers . + +``` +OPTIMIZE conn_zorder_only_ip ZORDER BY src_ip, dst_ip; +``` + +Step4: do optimize with z-order only ip, sort column: src_ip, dst_ip and shuffle partition just as file numbers . + +``` +OPTIMIZE conn_zorder ZORDER BY src_ip, src_port, dst_ip, dst_port; +``` + + +The complete code is as follows: + +```shell +./spark-shell +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession + +case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int) + +val conf = new SparkConf().setAppName("zorder_test") +val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() +import spark.implicits._ + +val sc = spark.sparkContext +sc.setLogLevel("WARN") +//ten billion rows and two hundred files +val numRecords = 10*1000*1000*1000L +val numFiles = 200 + +val dbName = s"zorder_test_$numFiles" +val baseLocation = s"hdfs://localhost:9000/zorder_test/$dbName/" +val connRandomParquet = "conn_random_parquet" +val connZorderOnlyIp = "conn_zorder_only_ip" +val connZorder = "conn_zorder" +spark.conf.set("spark.sql.shuffle.partitions", numFiles) +spark.conf.get("spark.sql.shuffle.partitions") +spark.conf.set("spark.sql.hive.convertMetastoreParquet",false) +spark.sql(s"drop database if exists $dbName cascade") +spark.sql(s"create database if not exists $dbName") +spark.sql(s"use $dbName") +spark.sql(s"create table $connRandomParquet (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet") +spark.sql(s"create table $connZorderOnlyIp (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet") +spark.sql(s"create table $connZorder (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet") +spark.sql(s"show tables").show(false) + +import scala.util.Random +// Function for preparing Zorder_Test data +def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".") +def randomPort(r: Random) = r.nextInt(65536) + +def randomConnRecord(r: Random) = ConnRecord( +src_ip = randomIPv4(r), src_port = randomPort(r), +dst_ip = randomIPv4(r), dst_port = randomPort(r)) + +val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it => +val partitionID = it.toStream.head +val r = new Random(seed = partitionID) +Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r)) +} + +df.write +.mode("overwrite") +.format("parquet") +.insertInto(connRandomParquet) + +spark.read.table(connRandomParquet) +.write +.mode("overwrite") +.format("parquet") +.insertInto(connZorderOnlyIp) + +spark.read.table(connRandomParquet) +.write +.mode("overwrite") +.format("parquet") +.insertInto(connZorder) +spark.stop() + +``` + +Optimize Sql: + +```sql + +set spark.sql.hive.convertMetastoreParquet=false; + +OPTIMIZE conn_zorder_only_ip ZORDER BY src_ip, dst_ip; + +OPTIMIZE zorder_test.conn_zorder ZORDER BY src_ip, src_port, dst_ip, dst_port; +``` + + + +Query Sql : + +```sql + +set spark.sql.hive.convertMetastoreParquet=true; + +select count(*) from conn_random_parquet where src_ip like '157%' and dst_ip like '216.%'; + +select count(*) from conn_zorder_only_ip where src_ip like '157%' and dst_ip like '216.%'; + +select count(*) from conn_zorder where src_ip like '157%' and dst_ip like '216.%'; +``` + + +# benchmark result +by querying the tables before and after optimization, we find that + +**10 billion data and 200 files and Query resource:200 core 600G memory** + +| Table | Average File Size | Scan row count | Average query time | row count Skipping ratio | +| ------------------- | ----------------- | -------------- | ------------------ | ------------------------ | +| conn_random_parquet | 1.2 G | 10,000,000,000 | 27.554 s | 0.0% | +| conn_zorder_only_ip | 890 M | 43,170,600 | 2.459 s | 99.568% | +| conn_zorder | 890 M | 54,841,302 | 3.185 s | 99.451% | + + + +**10 billion data and 2000 files and Query resource:200 core 600G memory** + +| Table | Average File Size | Scan row count | Average query time | row count Skipping ratio | +| ------------------- | ----------------- | -------------- | ------------------ | ------------------------ | +| conn_random_parquet | 234.8 M | 10,000,000,000 | 27.031 s | 0.0% | +| conn_zorder_only_ip | 173.9 M | 43,170,600 | 2.668 s | 99.568% | +| conn_zorder | 174.0 M | 54,841,302 | 3.207 s | 99.451% | + + + +**1 billion data and 10000 files and Query resource:10 core 40G memory** + +| Table | Average File Size | Scan row count | Average query time | row count Skipping ratio | +| ------------------- | ----------------- | -------------- | ------------------ | ------------------------ | +| conn_random_parquet | 2.7 M | 1,000,000,000 | 76.772 s | 0.0% | +| conn_zorder_only_ip | 2.1 M | 406,572 | 3.963 s | 99.959% | +| conn_zorder | 2.2 M | 387,942 | 3.621s | 99.961% | +