### What is the purpose of the pull request pr for KYUUBI #939:Add Z-Order extensions to optimize table with zorder.Z-order is a technique that allows you to map multidimensional data to a single dimension. We did a performance test for this test ,we used aliyun Databricks Delta test case https://help.aliyun.com/document_detail/168137.html?spm=a2c4g.11186623.6.563.10d758ccclYtVb Prepare data for the three scenarios: 1. 10 billion data and 2 hundred files(parquet files): for big file(1G) 2. 10 billion data and 1 thousand files(parquet files): for medium file(200m) 3. one billion data and 10 hundred files(parquet files): for smaller file(200k) test env: spark-3.1.2 hadoop-2.7.2 kyubbi-1.4.0 test step: Step1: create hive tables ```scala spark.sql(s"drop database if exists $dbName cascade") spark.sql(s"create database if not exists $dbName") spark.sql(s"use $dbName") spark.sql(s"create table $connRandomParquet (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet") spark.sql(s"create table $connZorderOnlyIp (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet") spark.sql(s"create table $connZorder (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet") spark.sql(s"show tables").show(false) ``` Step2: prepare data for parquet table with three scenarios we use the following code ```scala def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".") def randomPort(r: Random) = r.nextInt(65536) def randomConnRecord(r: Random) = ConnRecord( src_ip = randomIPv4(r), src_port = randomPort(r), dst_ip = randomIPv4(r), dst_port = randomPort(r)) ``` Step3: do optimize with z-order only ip, sort column: src_ip, dst_ip and shuffle partition just as file numbers . execute 'OPTIMIZE conn_zorder_only_ip ZORDER BY src_ip, dst_ip;' by kyuubi. Step4: do optimize with z-order only ip, sort column: src_ip, dst_ip and shuffle partition just as file numbers . execute 'OPTIMIZE conn_zorder ZORDER BY src_ip, src_port, dst_ip, dst_port;' by kyuubi. --------------------- # benchmark result by querying the tables before and after optimization, we find that **10 billion data and 200 files and Query resource:200 core 600G memory** | Table | Average File Size | Scan row count | Average query time | row count Skipping ratio | | ------------------- | ----------------- | -------------- | ------------------ | ------------------------ | | conn_random_parquet | 1.2 G | 10,000,000,000 | 27.554 s | 0.0% | | conn_zorder_only_ip | 890 M | 43,170,600 | 2.459 s | 99.568% | | conn_zorder | 890 M | 54,841,302 | 3.185 s | 99.451% | **10 billion data and 2000 files and Query resource:200 core 600G memory** | Table | Average File Size | Scan row count | Average query time | row count Skipping ratio | | ------------------- | ----------------- | -------------- | ------------------ | ------------------------ | | conn_random_parquet | 234.8 M | 10,000,000,000 | 27.031 s | 0.0% | | conn_zorder_only_ip | 173.9 M | 43,170,600 | 2.668 s | 99.568% | | conn_zorder | 174.0 M | 54,841,302 | 3.207 s | 99.451% | **1 billion data and 10000 files and Query resource:10 core 40G memory** | Table | Average File Size | Scan row count | Average query time | row count Skipping ratio | | ------------------- | ----------------- | -------------- | ------------------ | ------------------------ | | conn_random_parquet | 2.7 M | 1,000,000,000 | 76.772 s | 0.0% | | conn_zorder_only_ip | 2.1 M | 406,572 | 3.963 s | 99.959% | | conn_zorder | 2.2 M | 387,942 | 3.621s | 99.961% | Closes #1178 from hzxiongyinke/zorder_performance_test. Closes #939 369a9b41 [hzxiongyinke] remove set spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension; 8c8ae458 [hzxiongyinke] add index z-order-benchmark 66bd20fd [hzxiongyinke] change tables to three scenarios cc80f4e7 [hzxiongyinke] add License 70c29daa [hzxiongyinke] z-order performance_test 6f1892be [hzxiongyinke] Merge pull request #1 from apache/master Lead-authored-by: hzxiongyinke <1062376716@qq.com> Co-authored-by: hzxiongyinke <75288351+hzxiongyinke@users.noreply.github.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
211 lines
7.6 KiB
Markdown
211 lines
7.6 KiB
Markdown
<!--
|
||
- x to the Apache Software Foundation (ASF) under one or more
|
||
- contributor license agreements. See the NOTICE file distributed with
|
||
- this work for additional information regarding copyright ownership.
|
||
- The ASF licenses this file to You under the Apache License, Version 2.0
|
||
- (the "License"); you may not use this file except in compliance with
|
||
- the License. You may obtain a copy of the License at
|
||
-
|
||
- http://www.apache.org/licenses/LICENSE-2.0
|
||
-
|
||
- Unless required by applicable law or agreed to in writing, software
|
||
- distributed under the License is distributed on an "AS IS" BASIS,
|
||
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
- See the License for the specific language governing permissions and
|
||
- limitations under the License.
|
||
-->
|
||
|
||
<!-- DO NOT MODIFY THIS FILE DIRECTLY, IT IS AUTO GENERATED BY [org.apache.kyuubi.engine.spark.udf.KyuubiUDFRegistrySuite] -->
|
||
|
||
<div align=center>
|
||
|
||

|
||
|
||
</div>
|
||
|
||
# z-order benchmark
|
||
|
||
Z-order is a technique that allows you to map multidimensional data to a single dimension. We did a performance test
|
||
|
||
for this test ,we used aliyun Databricks Delta test case
|
||
https://help.aliyun.com/document_detail/168137.html?spm=a2c4g.11186623.6.563.10d758ccclYtVb
|
||
|
||
Prepare data for the three scenarios:
|
||
|
||
1. 10 billion data and 2 hundred files(parquet files): for big file(1G)
|
||
2. 10 billion data and 1 thousand files(parquet files): for medium file(200m)
|
||
3. one billion data and 10 hundred files(parquet files): for smaller file(200k)
|
||
|
||
test env:
|
||
spark-3.1.2
|
||
hadoop-2.7.2
|
||
kyubbi-1.4.0
|
||
|
||
test step:
|
||
|
||
Step1: create hive tables
|
||
|
||
```scala
|
||
spark.sql(s"drop database if exists $dbName cascade")
|
||
spark.sql(s"create database if not exists $dbName")
|
||
spark.sql(s"use $dbName")
|
||
spark.sql(s"create table $connRandomParquet (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
|
||
spark.sql(s"create table $connZorderOnlyIp (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
|
||
spark.sql(s"create table $connZorder (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
|
||
spark.sql(s"show tables").show(false)
|
||
```
|
||
|
||
Step2: prepare data for parquet table with three scenarios
|
||
we use the following code
|
||
|
||
```scala
|
||
def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".")
|
||
def randomPort(r: Random) = r.nextInt(65536)
|
||
|
||
def randomConnRecord(r: Random) = ConnRecord(
|
||
src_ip = randomIPv4(r), src_port = randomPort(r),
|
||
dst_ip = randomIPv4(r), dst_port = randomPort(r))
|
||
```
|
||
|
||
Step3: do optimize with z-order only ip, sort column: src_ip, dst_ip and shuffle partition just as file numbers .
|
||
|
||
```
|
||
OPTIMIZE conn_zorder_only_ip ZORDER BY src_ip, dst_ip;
|
||
```
|
||
|
||
Step4: do optimize with z-order only ip, sort column: src_ip, dst_ip and shuffle partition just as file numbers .
|
||
|
||
```
|
||
OPTIMIZE conn_zorder ZORDER BY src_ip, src_port, dst_ip, dst_port;
|
||
```
|
||
|
||
|
||
The complete code is as follows:
|
||
|
||
```shell
|
||
./spark-shell
|
||
import org.apache.spark.SparkConf
|
||
import org.apache.spark.sql.SparkSession
|
||
|
||
case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int)
|
||
|
||
val conf = new SparkConf().setAppName("zorder_test")
|
||
val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
|
||
import spark.implicits._
|
||
|
||
val sc = spark.sparkContext
|
||
sc.setLogLevel("WARN")
|
||
//ten billion rows and two hundred files
|
||
val numRecords = 10*1000*1000*1000L
|
||
val numFiles = 200
|
||
|
||
val dbName = s"zorder_test_$numFiles"
|
||
val baseLocation = s"hdfs://localhost:9000/zorder_test/$dbName/"
|
||
val connRandomParquet = "conn_random_parquet"
|
||
val connZorderOnlyIp = "conn_zorder_only_ip"
|
||
val connZorder = "conn_zorder"
|
||
spark.conf.set("spark.sql.shuffle.partitions", numFiles)
|
||
spark.conf.get("spark.sql.shuffle.partitions")
|
||
spark.conf.set("spark.sql.hive.convertMetastoreParquet",false)
|
||
spark.sql(s"drop database if exists $dbName cascade")
|
||
spark.sql(s"create database if not exists $dbName")
|
||
spark.sql(s"use $dbName")
|
||
spark.sql(s"create table $connRandomParquet (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
|
||
spark.sql(s"create table $connZorderOnlyIp (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
|
||
spark.sql(s"create table $connZorder (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
|
||
spark.sql(s"show tables").show(false)
|
||
|
||
import scala.util.Random
|
||
// Function for preparing Zorder_Test data
|
||
def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".")
|
||
def randomPort(r: Random) = r.nextInt(65536)
|
||
|
||
def randomConnRecord(r: Random) = ConnRecord(
|
||
src_ip = randomIPv4(r), src_port = randomPort(r),
|
||
dst_ip = randomIPv4(r), dst_port = randomPort(r))
|
||
|
||
val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it =>
|
||
val partitionID = it.toStream.head
|
||
val r = new Random(seed = partitionID)
|
||
Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r))
|
||
}
|
||
|
||
df.write
|
||
.mode("overwrite")
|
||
.format("parquet")
|
||
.insertInto(connRandomParquet)
|
||
|
||
spark.read.table(connRandomParquet)
|
||
.write
|
||
.mode("overwrite")
|
||
.format("parquet")
|
||
.insertInto(connZorderOnlyIp)
|
||
|
||
spark.read.table(connRandomParquet)
|
||
.write
|
||
.mode("overwrite")
|
||
.format("parquet")
|
||
.insertInto(connZorder)
|
||
spark.stop()
|
||
|
||
```
|
||
|
||
Optimize Sql:
|
||
|
||
```sql
|
||
|
||
set spark.sql.hive.convertMetastoreParquet=false;
|
||
|
||
OPTIMIZE conn_zorder_only_ip ZORDER BY src_ip, dst_ip;
|
||
|
||
OPTIMIZE zorder_test.conn_zorder ZORDER BY src_ip, src_port, dst_ip, dst_port;
|
||
```
|
||
|
||
|
||
|
||
Query Sql :
|
||
|
||
```sql
|
||
|
||
set spark.sql.hive.convertMetastoreParquet=true;
|
||
|
||
select count(*) from conn_random_parquet where src_ip like '157%' and dst_ip like '216.%';
|
||
|
||
select count(*) from conn_zorder_only_ip where src_ip like '157%' and dst_ip like '216.%';
|
||
|
||
select count(*) from conn_zorder where src_ip like '157%' and dst_ip like '216.%';
|
||
```
|
||
|
||
|
||
# benchmark result
|
||
by querying the tables before and after optimization, we find that
|
||
|
||
**10 billion data and 200 files and Query resource:200 core 600G memory**
|
||
|
||
| Table | Average File Size | Scan row count | Average query time | row count Skipping ratio |
|
||
| ------------------- | ----------------- | -------------- | ------------------ | ------------------------ |
|
||
| conn_random_parquet | 1.2 G | 10,000,000,000 | 27.554 s | 0.0% |
|
||
| conn_zorder_only_ip | 890 M | 43,170,600 | 2.459 s | 99.568% |
|
||
| conn_zorder | 890 M | 54,841,302 | 3.185 s | 99.451% |
|
||
|
||
|
||
|
||
**10 billion data and 2000 files and Query resource:200 core 600G memory**
|
||
|
||
| Table | Average File Size | Scan row count | Average query time | row count Skipping ratio |
|
||
| ------------------- | ----------------- | -------------- | ------------------ | ------------------------ |
|
||
| conn_random_parquet | 234.8 M | 10,000,000,000 | 27.031 s | 0.0% |
|
||
| conn_zorder_only_ip | 173.9 M | 43,170,600 | 2.668 s | 99.568% |
|
||
| conn_zorder | 174.0 M | 54,841,302 | 3.207 s | 99.451% |
|
||
|
||
|
||
|
||
**1 billion data and 10000 files and Query resource:10 core 40G memory**
|
||
|
||
| Table | Average File Size | Scan row count | Average query time | row count Skipping ratio |
|
||
| ------------------- | ----------------- | -------------- | ------------------ | ------------------------ |
|
||
| conn_random_parquet | 2.7 M | 1,000,000,000 | 76.772 s | 0.0% |
|
||
| conn_zorder_only_ip | 2.1 M | 406,572 | 3.963 s | 99.959% |
|
||
| conn_zorder | 2.2 M | 387,942 | 3.621s | 99.961% |
|
||
|