Presto 基础

本文主要介绍下 Presto 的基础概念。

总体架构

查询计划

句法优化器

初始查询计划

查询最直接的查询计划非常接近其 SQL 语法结构,查询计划是树状的,执行从叶子节点开始,沿着树结构逐步上升。

1
2
3
4
5
6
7
8
9
10
11
12
13
- Limit[5]
- Sort[orders_sum DESC]
- LateralJoin[2]
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey AND c.custkey = o.custkey]
- CrossJoin
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
- EnforceSingleRow[region_name := r.name]
- Filter[r.regionkey = n.regionkey]
- TableScan[region]

TableScan 算子从底层存储中访问表,并返回包含表中所有行的结果集;
Filter 算子接收行并在每一行数据上应用过滤条件,只留下满足条件的行;
CrossJoin 算子从两个子节点接收数据集,返回两个数据集中行的所有组合,它可能会将其中一个数据集存放在内存中,从而避免多次访问底层存储;

谓词下推

将过滤条件移动到尽可能接近数据源的位置,使数据量在查询开始后尽可能早地开始缩减。
案例中,将原 Filter 算子的一部分条件保留在新的简化 Filter 算子中,另一部分和下层的 CrossJoin 算子合并为新的 InnerJoin 算子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey AND c.custkey = o.custkey] // 原始 Filter 算子
- CrossJoin
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]

...
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey] // 简化后的 Filter 算子
- InnerJoin[o.custkey = c.custkey] // 合并后新产生的 InnerJoin 算子
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
...
```

#### Cross Join 消除

```txt
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey] // 先过滤 nationkey 列
- InnerJoin[o.custkey = c.custkey] // 然后是 Inner Join custkey
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
...
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- InnerJoin[c.custkey = o.custkey] // 重新排列为 custkey 在前面
- InnerJoin[n.nationkey = c.nationkey] // nationkey 在后面
- TableScan[nation]
- TableScan[customer]
- TableScan[orders]
...

局部聚合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- InnerJoin[c.custkey = o.custkey]
- InnerJoin[n.nationkey = c.nationkey]
- TableScan[nation]
- TableScan[customer]
- TableScan[orders]
...
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- InnerJoin[c.custkey = o.custkey]
- InnerJoin[n.nationkey = c.nationkey]
- TableScan[nation]
- TableScan[customer]
- Aggregate[by custkey; totalprice := sum(totalprice)] // 局部预聚合,聚合用户维度的订单金额
- TableScan[orders]
...

Lateral Join 去关联化

Lateral Join 可以通过如下方式实现:使用 for 循环迭代一个数据集中的所有行,并对每一行执行另一次查询。

Presto 将子查询去关联化,将所有的相关条件拉取上来并形成一个标准的 Left Join。

1
2
3
4
SELECT
(SELECT name FROM region r WHERE regionkey = n.regionkey)
AS region_name
FROM nation n

变换为

1
2
3
4
SELECT
r.name AS region_name,
n.name AS nation_name
FROM nation n LEFT OUTER JOIN region r ON r.regionkey = n.regionkey

平时写 odps sql ,也是习惯用下面这种标准的 left join ,如曝光表 left join 点击表 left join 行为事件表等等。

Semi-join (IN) 去关联化

子查询不只用于在查询中拉取信息,也常用于配合 IN 谓词过滤行。

下面这个查询,用于找出客户和物品供应商来自同一个国家(地区)的订单,查询这样的订单非常有用,这样可以绕过分发中心直接从供应商发货到消费者,
以此来降低运送成本:

1
2
3
4
5
6
7
8
9
10
11
12
SELECT DISTINCT o.orderkey
FROM lineitem l
JOIN orders o ON o.orderkey = l.orderkey
JOIN customer ON o.custkey = c.custkey
WHERE c.nationkey IN (
-- 多次调用的子查询
SELECT s.nationkey
FROM part p
JOIN partsupp ps ON p.partkey = ps.partkey
JOIN supplier s ON ps.suppkey = s.suppkey
WHERE p.partkey = l.partkey
);

基于代价的优化器 (CBO)

Cost Based Optimizer ,计划转换时不但基于本身的形状,也将查询数据的形状考虑在內:

  • CPU 时间
  • 内存使用
  • 网络带宽

基于代价优化的查询案例:

1
2
3
4
5
6
7
8
SELECT
n.name AS nation_name,
avg(extendedprice) AS avg_price
FROM nation n, orders o, customer c, lineitem l
WHERE n.nationkey = c.nationkey
AND c.custkey = o.custkey
AND o.orderkey = l.orderkey
GROUP BY n.nationkey, n.name;

如果不基于代价进行决策,查询优化器就会使用规则来优化此查询的初始计划。这个计划完全由 SQL 查询的语法结构所决定:

1
2
3
4
5
6
7
8
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- InnerJoin[o.orderkey = l.orderkey]
- InnerJoin[c.custkey = o.custkey]
- InnerJoin[n.nationkey = c.nationkey]
- TableScan[nation]
- TableScan[customer]
- TableScan[orders]
- TableScan[lineitem]

下面变换一个 SQL ,仅仅改变一下 WHERE 语句中条件的顺序:

1
2
3
4
5
6
7
8
SELECT
n.name AS nation_name,
avg(extendedprice) AS avg_price
FROM nation n, orders o, customer c, lineitem l
WHERE c.custkey = o.custkey
AND o.orderkey = l.orderkey
AND n.nationkey = c.nationkey
GROUP BY n.nationkey, n.name;

就产生了一个具有不同 Join 顺序的查询计划:

1
2
3
4
5
6
7
8
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- InnerJoin[n.nationkey = c.nationkey]
- InnerJoin[o.orderkey = l.orderkey]
- InnerJoin[c.custkey = o.custkey]
- TableScan[customer]
- TableScan[orders]
- TableScan[lineitem]
- TableScan[nation]

从时间复杂度的角度来看,无论是将 nation 表 Join 到 customer 表,还是反过来将 customer 表 Join 到 nation 表,都无关紧要,
两个表都要处理,在使用 Hash Join 时,总的运行时间与输出行数成正比。
然而,时间复杂度并不是唯一的考量标准,通常对于处理数据的程序,尤其是大规模数据库系统来说,内存使用和网络流量也很重要。

连接器

blackhole Connector

它作为任何数据的最终消费者,类似于 UNIX 操作系统中的 null 设备(/dev/null)。可以把它作为从其他 catalog 中读取并插入数据的目标,实际上不写入任何内容,所以可以用它来衡量 catalog
读取的性能。

/etc/catalog/blackhole.properties:

1
connector.name = blackhole

JMX Connector

/etc/catalog/jmx.properties:

1
connector.name = jmx

为最新数据提供了名为 current 的 schema :

1
2
3
SHOW TABLES FROM jmx.current;

SELECT vmname, uptime, node FROM jmx.current."java.lang.type=runtime";

Memory Connector

可以像使用临时数据库一样使用内存连接器,所有的数据都存储在集群的内存中,停止集群就会销毁数据,可以用作调试。

/etc/catalog/memory.properties:

1
connector.name = memory

tpch Connector

TPC-H 是一款面向商品零售业的决策支持系统测试基准,它定义了8张表,22个查询,遵循 SQL92 。TPC-H 基准的数据库模式遵循第三范式,其数据维护功能仅仅限制了潜在的对索引的过度使用,而没有测试 DBMS 执行 ETL 的能力。同时,新兴的数据仓库开始采用新的模型,如星型模型、雪花模型,TPC-H 已经不能精准反应当今数据库系统的真实性能。

/etc/catalog/tpch.properties:

1
2
connector.name = tpch
tpch.splits-per-node = 4

从 TPC 官网下载 TPC-H 的 zip 包 ,需要填写一些个人信息,之后会发送一个下载链接到邮箱中:

1
2
3
cd ./916c6f4e-1935-4f81-ad6f-04165831ae11-tpc-h-tool/TPC-H_Tools_v3.0.0/dbgen
cp makefile.suite Makefile
vi Makefile

修改 Makefile 的 103 ~ 111 行:

1
2
3
4
5
6
7
8
9
CC      = gcc
# Current values for DATABASE are: INFORMIX, DB2, TDAT (Teradata)
# SQLSERVER, SYBASE, ORACLE, VECTORWISE
# Current values for MACHINE are: ATT, DOS, HP, IBM, ICL, MVS,
# SGI, SUN, U2200, VMS, LINUX, WIN32
# Current values for WORKLOAD are: TPCH
DATABASE= ORACLE
MACHINE = LINUX
WORKLOAD = TPCH

执行 make :

1
make

在 Mac 上编译可能会报如下错误,因为 mac 下的 malloc 头文件移动到了 sys 下:

1
2
3
4
5
6
gcc -g -DDBNAME=\"dss\" -DLINUX -DORACLE -DTPCH -DRNG_TEST -D_FILE_OFFSET_BITS=64    -c -o bm_utils.o bm_utils.c
bm_utils.c:71:10: fatal error: 'malloc.h' file not found
#include <malloc.h>
^~~~~~~~~~
1 error generated.
make: *** [bm_utils.o] Error 1

修改 bm_utils.c 文件的第71行,varsub.c 文件的第44行:

1
#include <malloc.h>

1
#include <sys/malloc.h>

查看 TPC-H_Tools_v3.0.0/dbgen 目录下生成了两个新的命令 dbgenqgen ,分别用来生成数据与生成 sql:

1
2
3
ls -lrt
-rwxr-xr-x 1 staff staff 105842 3 28 11:29 dbgen
-rwxr-xr-x 1 staff staff 100977 3 28 11:31 qgen

生成数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 将 dbgen、qgen 命令拷贝至一个单独的目录,便于测试
cp dbgen tpch-kit
cp dists.dss tpch-kit
cp queries tpch-kit
cp qgen tpch-kit


# -s 参数用于控制数据集规模,1GB;-f 参数强制覆盖已有数据;
cd tpch-kit
./dbgen -s 1 -f

ls -lrt *.tpl
-rw-r--r-- 1 staff staff 1409184 3 28 11:42 supplier.tbl
-rw-r--r-- 1 staff staff 24346144 3 28 11:42 customer.tbl
-rw-r--r-- 1 staff staff 171952161 3 28 11:42 orders.tbl
-rw-r--r-- 1 staff staff 759863287 3 28 11:42 lineitem.tbl
-rw-r--r-- 1 staff staff 24135125 3 28 11:42 part.tbl
-rw-r--r-- 1 staff staff 118984616 3 28 11:42 partsupp.tbl
-rw-r--r-- 1 staff staff 2224 3 28 11:42 nation.tbl
-rw-r--r-- 1 staff staff 389 3 28 11:42 region.tbl
表名 说明
supplier 供货商
customer 顾客
orders 订单
lineitem 在线商品
part 零件
partsupp 供货商的零件信息
nation 国家
region 地区

可以将 .tpl 数据转换为 csv :

1
for i in `ls *.tbl`; do sed 's/|$//' $i > ${i/tbl/csv}; echo $i; done;

生成 query sql ,DSS_QUERY=./queries 是 sql 模板,-s 表示数据集规模:

1
2
3
cd tpch-kit
mkdir q
for id in `seq 1 22`; do DSS_QUERY=./queries ./qgen -s 1 $id -b ./dists.dss > q/$id.sql; done

可能会产生如下报错:

1
Open failed for ./queries/1.sql at qgen.c:170

需要设置下环境变量:

1
2
3
4
5
sudo vi /etc/profile
export DSS_CONFIG=tpch-kit/dbgen
export DSS_QUERY=$DSS_CONFIG/PATH_TO_QUERIES_FOLDER

source /etc/profile

tpch-kit/dbgen/q 目录将会生成22个 query sql 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
cd tpch-kit/q
ls -lrt
-rw-r--r--@ 1 staff staff 598 3 28 14:02 1.sql
-rw-r--r-- 1 staff staff 769 3 28 14:02 2.sql
-rw-r--r-- 1 staff staff 478 3 28 14:02 3.sql
-rw-r--r-- 1 staff staff 421 3 28 14:02 4.sql
-rw-r--r-- 1 staff staff 556 3 28 14:02 5.sql
-rw-r--r-- 1 staff staff 311 3 28 14:02 6.sql
-rw-r--r-- 1 staff staff 893 3 28 14:02 7.sql
-rw-r--r-- 1 staff staff 868 3 28 14:02 8.sql
-rw-r--r-- 1 staff staff 672 3 28 14:02 9.sql
-rw-r--r-- 1 staff staff 592 3 28 14:02 10.sql
-rw-r--r-- 1 staff staff 568 3 28 14:02 11.sql
-rw-r--r-- 1 staff staff 659 3 28 14:02 12.sql
-rw-r--r-- 1 staff staff 413 3 28 14:02 13.sql
-rw-r--r-- 1 staff staff 395 3 28 14:02 14.sql
-rw-r--r-- 1 staff staff 594 3 28 14:02 15.sql
-rw-r--r-- 1 staff staff 550 3 28 14:02 16.sql
-rw-r--r-- 1 staff staff 350 3 28 14:02 17.sql
-rw-r--r-- 1 staff staff 516 3 28 14:02 18.sql
-rw-r--r-- 1 staff staff 1041 3 28 14:02 19.sql
-rw-r--r-- 1 staff staff 670 3 28 14:02 20.sql
-rw-r--r-- 1 staff staff 727 3 28 14:02 21.sql
-rw-r--r-- 1 staff staff 726 3 28 14:02 22.sql

tpcds Connector

TPC-DS 采用星型、雪花等多维数据模式,包含7张事实表,17张维度表,平均每张表含有18列。其工作负载包含99个 SQL 查询,覆盖 SQL92 和 2003 的核心部分以及 OLAP 。这个测试集包含对大数据集的统计、报表生成、联机查询、数据挖掘等复杂应用,测试用的数据和值是有倾斜的,与真实数据一致。TPC-DS 是与真实场景非常接近的一个测试集。

/etc/catalog/tpcds.properties:

1
2
connector.name = tpcds
tpcds.splits-per-node = 4

安装 TPC-DS 工具

由于从 TPC 官网下载的 zip 包:

生成数据时,会报如下错误,暂未找到解决办法:

1
2
3
4
5
6
dsdgen Population Generator (Version 3.2.0)
Copyright Transaction Processing Performance Council (TPC) 2001 - 2021
Warning: This scale factor is valid for QUALIFICATION ONLY
Runtime ERROR: Distribution over-run/under-run
Check distribution definitions and usage for cities.
index = -1, length=1000.

因此直接从 下载并进行编译。

编译 tpcds :

1
2
3
cd ../tpc-ds-tool/tools
cp Makefile.suite Makefile
make

在 MacOS 上编译,可能会出现如下几个报错:

错误1 :因为 values.h 是 GNU 的库:

1
2
3
4
5
6
In file included from mkheader.c:37:
./porting.h:46:10: fatal error: 'values.h' file not found
#include <values.h>
^~~~~~~~~~
1 error generated.
make: *** [mkheader.o] Error 1

修改 porting.h 文件的第46行:

1
#include <values.h>

1
2
#include <limits.h>
#include <float.h>

错误2:malloc.h 头文件位置不对:

1
2
3
4
5
date.c:40:10: fatal error: 'malloc.h' file not found
#include <malloc.h>
^~~~~~~~~~
1 error generated.
make: *** [date.o] Error 1

修改 date.c 文件的第42行,dist.c 文件的第54行,misc.c 文件的第45行,tokenizer.l的第50行:

1
#include <malloc.h>

1
#include <sys/malloc.h>

错误3:缺少宏定义

1
2
3
4
genrand.c:87:12: error: use of undeclared identifier 'MAXINT'
s += MAXINT;
^
......

因为 mac 和 linux 的 h 文件差异,部分宏 mac 并没有,因此直接在 genrand.h 中自己添加即可:

1
#define MAXINT 4096000

查看 tpc-ds-tool/tools 目录下生成了两个新的命令 dbgenqgen ,分别用来生成数据与生成 sql:

1
2
3
ls -lrt *gen
-rwxr-xr-x 1 staff staff 329603 3 28 16:24 dsdgen
-rwxr-xr-x 1 staff staff 249219 3 28 16:25 dsqgen

创建 TPC-DS 测试需要用到的表

  1. 准备好创建表语句

TPC-DS 已经提前准备好了创建表相关的 SQL 文件,位于 tools 目录下:

  • tpcds.sql:创建25张表
  • tpcds_ri.sql:创建表与表之间关系的 sql 语句
  • tpcds_source.sql
  1. 利用数据库连接工具(如:navicat)创建相应的数据库和表

生成测试数据

1
2
3
4
5
cd tpc-ds-tool
mkdir data
cd tools

./dsdgen -SCALE 1GB -FORCE -DIR ../data

之后通常会通过命令将测试数据 load 到具体的数据源存储中,用于后续测试。

生成99个 query sql

1
2
3
4
5
6
7
8
9
cd tpc-ds-tool
mkdir sql
cd tools

# for循环命令,生成99个 query sql
for id in `seq 1 99`; do ./dsqgen -DIRECTORY ../query_templates -TEMPLATE "query${id}.tpl" -DIALECT oracle -FILTER Y > ../sql/query${id}.sql; done

# 生成单个 query sql 示例
./dsqgen -DIRECTORY ../query_templates -TEMPLATE "query8.tpl" -DIALECT oracle -VERBOSE Y > ../sql/query8.sql

RDBMS Connector

/etc/catalog/postgresql.properties:

1
2
3
4
connector.name = 'postgresql'
connectot-url = jdbc:postgresql://db.example.com:5432/database
connector-user = root
connector-password = secret

Hive Connector

/etc/catalog/hive.properties:

1
2
connector.name = hive-hadoop2
hive.metastore.uri = thrift://example.net:9083

建表语句:

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE hive.web.page_views (
view_time timestamp,
user_id bigint,
page_url varchar,
view_date date,
country varchar
) WITH (
format = 'ORC',
external_location = 's3://starburst-external/pageviews',
partitioned_by = ARRAY['view_date', 'country']
)

Presto 自动发现和添加分区的命令:

1
2
3
4
5
CALL system.sync_partition_metadata (
'web',
'page_views',
'FULL'
)

Phoenix Connector

/etc/catalog/bigtables.properties:

1
2
connector.name = phoenix
phoenix.connection-url = jdbc:phoenix:zookeeper1.zookeeper2:2181:/hbase

Druid Connector

/etc/catalog/druid.properties:

1
2
3
connector.name = druid
druid.coordinator-url = http://localhost:8081
druid.broker-url = http://localhost:8082

Kafka Connector

/etc/catalog/trafficstream.properties:

1
2
3
connector.name = kafka
kafka.table-names = web.pages,web.users
kafka.nodes = trafficstream.example.com:9092

kafka 的消息过期配置项:

1
2
3
log.retention.hours = 168
log.segment.bytes = 1073741824
log.clenup.policy = delete

可以定期将 kafka 的数据迁移至 hdfs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 创建 hdfs 表
CREATE TABLE hdfs.web.page_views
WITH (
format = 'ORC',
partitioned_by = ARRAY['view_date']
)
AS
SELECT *
FROM trafficstream.web.page_views;

-- 定期查询导入
INSERT INTO hdfs.web.page_views
SELECT *
FROM trafficstream.web.page_views;

Iceberg Connector

/etc/catalog/iceberg.properties:

1
2
3
4
5
connector.name = iceberg
hive.metastore.uri = thrift://example.net:9083
iceberg.catalog.type = hive
iceberg.file-format = PARQUET
iceberg.compression-codec = GZIP

联邦查询

1
2
3
4
5
6
7
SELECT f.uniquecarrier, c.description, count(*) AS ct
FROM hive.ontime.flights_orc f, -- hive 事实明细表
postgresql.airline.carrier c -- postgresql 关系维表
WHERE c.code = f.uniquecarrier
GROUP BY f.uniquecarrier, c.description
ORDER BY count(*) DESC
LIMIT 10;

参考资料

参考书籍:《Presto实战》、《Presto技术内幕》
f8-2019-demo
官网
presto 代码库
presto 论文
presto 语言 libraries
jupyter notebook
TPC 官网
【大数据之数据仓库】TPCH工具使用指南
【大数据之数据仓库】基准测试之TPCH
Compiling TPC-H tools for Mac
tpch-kit包
【大数据之数据仓库】TPCDS工具使用指南
【大数据之数据仓库】基准测试之TPCDS
tpcds-kit包
MacOS 下编译 tpcds
DEFINE 缺失修复记录