kyuubi/docs/sql/z-order-benchmark.md
hzxiongyinke 0ecf8fbc7e
[KYUUBI #939] z-order performance_test
### What is the purpose of the pull request

pr for KYUUBI #939:Add Z-Order extensions to optimize table with zorder.Z-order is a technique that allows you to map multidimensional data to a single dimension. We did a performance test

for this test ,we used aliyun Databricks Delta test case
https://help.aliyun.com/document_detail/168137.html?spm=a2c4g.11186623.6.563.10d758ccclYtVb

Prepare data for the three scenarios:

1. 10 billion data and 2 hundred files(parquet files): for big file(1G)
2. 10 billion data and 1 thousand files(parquet files): for medium file(200m)
3. one billion data and 10 hundred files(parquet files): for smaller file(200k)

test env:
spark-3.1.2
hadoop-2.7.2
kyubbi-1.4.0

test step:

Step1: create hive tables

```scala
spark.sql(s"drop database if exists $dbName cascade")
spark.sql(s"create database if not exists $dbName")
spark.sql(s"use $dbName")
spark.sql(s"create table $connRandomParquet (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
spark.sql(s"create table $connZorderOnlyIp (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
spark.sql(s"create table $connZorder (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
spark.sql(s"show tables").show(false)
```

Step2: prepare data for parquet table with three scenarios
we use the following code

```scala
def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".")
def randomPort(r: Random) = r.nextInt(65536)

def randomConnRecord(r: Random) = ConnRecord(
  src_ip = randomIPv4(r), src_port = randomPort(r),
  dst_ip = randomIPv4(r), dst_port = randomPort(r))
```

Step3: do optimize with z-order only ip, sort column: src_ip, dst_ip and shuffle partition just as file numbers .
	execute  'OPTIMIZE conn_zorder_only_ip ZORDER BY src_ip, dst_ip;' by kyuubi.

Step4: do optimize with z-order only ip, sort column: src_ip, dst_ip and shuffle partition just as file numbers .
	execute  'OPTIMIZE conn_zorder ZORDER BY src_ip, src_port, dst_ip, dst_port;' by kyuubi.

---------------------
# benchmark result
by querying the tables before and after optimization, we find that

**10 billion data and 200 files and Query resource:200 core 600G memory**

| Table               | Average File Size | Scan row count | Average query time | row count Skipping ratio |
| ------------------- | ----------------- | -------------- | ------------------ | ------------------------ |
| conn_random_parquet | 1.2 G             | 10,000,000,000 | 27.554 s           | 0.0%                     |
| conn_zorder_only_ip | 890 M             | 43,170,600     | 2.459 s            | 99.568%                  |
| conn_zorder         | 890 M             | 54,841,302     | 3.185 s            | 99.451%                  |

**10 billion data and 2000 files and Query resource:200 core 600G memory**

| Table               | Average File Size | Scan row count | Average query time | row count Skipping ratio |
| ------------------- | ----------------- | -------------- | ------------------ | ------------------------ |
| conn_random_parquet | 234.8 M           | 10,000,000,000 | 27.031 s           | 0.0%                     |
| conn_zorder_only_ip | 173.9 M           | 43,170,600     | 2.668 s            | 99.568%                  |
| conn_zorder         | 174.0 M           | 54,841,302     | 3.207 s            | 99.451%                  |

**1 billion data and 10000 files and Query resource:10 core 40G memory**

| Table               | Average File Size | Scan row count | Average query time | row count Skipping ratio |
| ------------------- | ----------------- | -------------- | ------------------ | ------------------------ |
| conn_random_parquet | 2.7 M             | 1,000,000,000  | 76.772 s           | 0.0%                     |
| conn_zorder_only_ip | 2.1 M             | 406,572        | 3.963 s            | 99.959%                  |
| conn_zorder         | 2.2 M             | 387,942        | 3.621s             | 99.961%                  |

Closes #1178 from hzxiongyinke/zorder_performance_test.

Closes #939

369a9b41 [hzxiongyinke] remove set spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension;
8c8ae458 [hzxiongyinke] add index z-order-benchmark
66bd20fd [hzxiongyinke] change tables to three scenarios
cc80f4e7 [hzxiongyinke] add License
70c29daa [hzxiongyinke] z-order performance_test
6f1892be [hzxiongyinke] Merge pull request #1 from apache/master

Lead-authored-by: hzxiongyinke <1062376716@qq.com>
Co-authored-by: hzxiongyinke <75288351+hzxiongyinke@users.noreply.github.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
2021-09-29 17:51:37 +08:00

7.6 KiB
Raw Blame History

z-order benchmark

Z-order is a technique that allows you to map multidimensional data to a single dimension. We did a performance test

for this test ,we used aliyun Databricks Delta test case https://help.aliyun.com/document_detail/168137.html?spm=a2c4g.11186623.6.563.10d758ccclYtVb

Prepare data for the three scenarios:

  1. 10 billion data and 2 hundred filesparquet files: for big file(1G)
  2. 10 billion data and 1 thousand filesparquet files: for medium file(200m)
  3. one billion data and 10 hundred filesparquet files: for smaller file(200k)

test env spark-3.1.2 hadoop-2.7.2 kyubbi-1.4.0

test step

Step1: create hive tables

spark.sql(s"drop database if exists $dbName cascade")
spark.sql(s"create database if not exists $dbName")
spark.sql(s"use $dbName")
spark.sql(s"create table $connRandomParquet (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
spark.sql(s"create table $connZorderOnlyIp (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
spark.sql(s"create table $connZorder (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
spark.sql(s"show tables").show(false)

Step2 prepare data for parquet table with three scenarios we use the following code

def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".")
def randomPort(r: Random) = r.nextInt(65536)

def randomConnRecord(r: Random) = ConnRecord(
  src_ip = randomIPv4(r), src_port = randomPort(r),
  dst_ip = randomIPv4(r), dst_port = randomPort(r))

Step3 do optimize with z-order only ip sort column src_ip, dst_ip and shuffle partition just as file numbers .

OPTIMIZE conn_zorder_only_ip ZORDER BY src_ip, dst_ip;

Step4 do optimize with z-order only ip sort column src_ip, dst_ip and shuffle partition just as file numbers .

OPTIMIZE conn_zorder ZORDER BY src_ip, src_port, dst_ip, dst_port;

The complete code is as follows

./spark-shell
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int)

val  conf  = new SparkConf().setAppName("zorder_test")
val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
import spark.implicits._

val sc = spark.sparkContext
sc.setLogLevel("WARN")
//ten billion rows and two hundred files
val numRecords = 10*1000*1000*1000L
val numFiles = 200

val dbName = s"zorder_test_$numFiles"
val baseLocation = s"hdfs://localhost:9000/zorder_test/$dbName/"
val connRandomParquet = "conn_random_parquet"
val connZorderOnlyIp = "conn_zorder_only_ip"
val connZorder = "conn_zorder"
spark.conf.set("spark.sql.shuffle.partitions", numFiles)
spark.conf.get("spark.sql.shuffle.partitions")
spark.conf.set("spark.sql.hive.convertMetastoreParquet",false)
spark.sql(s"drop database if exists $dbName cascade")
spark.sql(s"create database if not exists $dbName")
spark.sql(s"use $dbName")
spark.sql(s"create table $connRandomParquet (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
spark.sql(s"create table $connZorderOnlyIp (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
spark.sql(s"create table $connZorder (src_ip string, src_port int, dst_ip string, dst_port int) stored as parquet")
spark.sql(s"show tables").show(false)

import scala.util.Random
// Function for preparing Zorder_Test data
def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".")
def randomPort(r: Random) = r.nextInt(65536)

def randomConnRecord(r: Random) = ConnRecord(
src_ip = randomIPv4(r), src_port = randomPort(r),
dst_ip = randomIPv4(r), dst_port = randomPort(r))

val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it =>
val partitionID = it.toStream.head
val r = new Random(seed = partitionID)
Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r))
}

df.write
.mode("overwrite")
.format("parquet")
.insertInto(connRandomParquet)

spark.read.table(connRandomParquet)
.write
.mode("overwrite")
.format("parquet")
.insertInto(connZorderOnlyIp)

spark.read.table(connRandomParquet)
.write
.mode("overwrite")
.format("parquet")
.insertInto(connZorder)
spark.stop()

Optimize Sql:


set spark.sql.hive.convertMetastoreParquet=false;

OPTIMIZE conn_zorder_only_ip ZORDER BY src_ip, dst_ip;

OPTIMIZE zorder_test.conn_zorder ZORDER BY src_ip, src_port, dst_ip, dst_port;

Query Sql :


set spark.sql.hive.convertMetastoreParquet=true;

select count(*) from conn_random_parquet where src_ip like '157%' and dst_ip like '216.%';

select count(*) from conn_zorder_only_ip where src_ip like '157%' and dst_ip like '216.%';

select count(*) from conn_zorder where src_ip like '157%' and dst_ip like '216.%';

benchmark result

by querying the tables before and after optimization, we find that

10 billion data and 200 files and Query resource:200 core 600G memory

Table Average File Size Scan row count Average query time row count Skipping ratio
conn_random_parquet 1.2 G 10,000,000,000 27.554 s 0.0%
conn_zorder_only_ip 890 M 43,170,600 2.459 s 99.568%
conn_zorder 890 M 54,841,302 3.185 s 99.451%

10 billion data and 2000 files and Query resource:200 core 600G memory

Table Average File Size Scan row count Average query time row count Skipping ratio
conn_random_parquet 234.8 M 10,000,000,000 27.031 s 0.0%
conn_zorder_only_ip 173.9 M 43,170,600 2.668 s 99.568%
conn_zorder 174.0 M 54,841,302 3.207 s 99.451%

1 billion data and 10000 files and Query resource:10 core 40G memory

Table Average File Size Scan row count Average query time row count Skipping ratio
conn_random_parquet 2.7 M 1,000,000,000 76.772 s 0.0%
conn_zorder_only_ip 2.1 M 406,572 3.963 s 99.959%
conn_zorder 2.2 M 387,942 3.621s 99.961%