初识Presto(Trino)

本文为 Presto 的入门学习笔记。 Presto 是一个开源的分布式 SQL 查询引擎,它是为了高效查询不同系统和各种规模的数据源而从头开始设计和编写的一套系统。
由于开源纷争,Presto 现已更名为 Trino。

Presto 简介

背景及发展

Hadoop 提供的大数据解决方案使用的是 MR 计算框架,这种计算框架适用于大数据的离线和批量计算,因为该计算框架强调的是吞吐率而不是计算效率,所以其不能满足大数据快速实时 Ad-Hoc 查询计算的性能要求。

因此,开源社区和各大互联网公司纷纷进行大数据实时 Ad-Hoc 查询计算产品的研发 , Facebook 于2012年秋季开始开发 Presto,目前该产品已经在超过 1000 名 Facebook 雇员中使用,每天运行超过 30000 个查询,每日查询数据量在 1PB 级别。Facebook 称 Presto 的性能比 Hive 要好上 10 倍还多,2013年 Facebook 正式宣布开源 Presto。

特点

  • 多数据源
    目前 Presto 可以支持 Mysql、PostgreSql、Cassandra、Hive、Kafka、JMX、Iceberg 等多种 Connector,并且可以支持分库分表以及快速读取的功能。

  • 支持 SQL
    Presto 已经可以完全支持 ANSI SQL,并提供了一个 SQL Shell 给用户,用户可以直接使用 ANSI SQL 进行数据查询和计算。

  • 扩展性
    开发人员可以很容易的开发出适用于自己特定数据源的 Connector,并且可以使用 SQL 语句查询和分析自定义 Connector 中的数据。

  • 混合计算
    每种类型的数据源都对应于一种特定类型的 Connector,用户可以根据业务需要在 Presto 中针对于一种类型的 Connector 配置一个或多个 Catalog 并查询其中的数据,用户可以混合多个 Catalog 进行 join 查询和计算。

  • 高性能
    经过 Facebook 和 京东商城的测试,Presto 的查询平均性能是 Hive 的10倍以上。

  • 流水线
    由于 Presto 是基于 pipeline 进行设计的,因此在进行海量数据处理的过程中,终端用户不用等到所有的数据都处理完毕才能看到结果,而是可以像自来水管道一样,一旦开始计算,就可以立即产生一部分结果数据,并且结果数据会一部分接一部分地呈现在终端客户面前。

概念

Presto 服务进程

  • Coordinator
    Coordinator 服务进程部署于集群中一个单独的节点上,是整个 Presto 集群的管理节点。主要用于接收客户端提交的查询,查询语句解析,生成查询执行计划、Stage、Task,对生成的 Task 进行调度。
    此外,Coordinator 还对集群中的所有 Worker 进行管理,是整个 Presto 集群的 Master 进程,该进程既与 Worker 进行通信从而获得最新的 Worker 信息,又与 Client 进行通信,从而接收查询请求。

  • Worker
    在每个 Worker 节点上都存在一个 Worker 服务进程,主要进行数据的处理以及 Task 的执行。Worker 进程每隔一定的时间会向 Coordinator 上的 restful
    服务发送心跳。当客户端提交一个查询时,Coordinator 则会从当前存活的 Worker 列表中选择出合适的 Worker 节点去运行 Task。Worker 在执行每个 Task 时会进一步对当前 Task 读入的每个 Split 进行一系列的操作和处理。

Presto 模型

  • Connector

使 Presto 适配一个数据源,每一个 Catalog 对应于一个特定的连接器。

  • Catalog

定义连接到一个数据源的细节,它包含了 Schema 并配置了一个连接器来使用。

  • Schema

组织表的一种方式。Catalog 和 Schema 一起定义了一个集合的表,这些表可以查询。

  • Table

表是无序的行的集合。这些行内容被组织成带有数据类型的有名称的列。

Presto 查询执行模型

在 Presto 中一次查询执行会被分解为多个 Stage ,Stage 与 Stage 之间是有前后依赖关系的。每个 Stage 内部又会被分解为多个 Task,属于每个 Stage 的 Task 被均分在每个 Worker 上并行执行。在每个 Task 内部又会被分解为多个 Driver ,每个 Driver 负责处理一个 Split ,而且每个 Driver 由一系列前后相连的 Operator 组成,这里的每个 Operator 都代表针对于一个 Split 的操作。

  • Statement
    Statement 语句,指的是终端用户输入的用文字表示的 SQL 语句,由子句(Clause)、表达式(Expression)和断言(Predicate)组成。

  • Query
    Query 即查询执行。当 Presto 接收一个 SQL 语句并执行时,会解析该 SQL 语句,将其转变成一个查询执行和相关的查询执行计划。一个查询执行代表可以在 Presto 集群中运行的查询,是由运行在各个 Worker 上且各自之间相互关联的阶段(Stage)组成的。
    查询执行是为了完成 SQL 语句所表述的查询而实例化的配置信息、组件、查询执行计划和优化信息等。一个查询执行由 Stage、Task、Driver、Split、Operator 和 DataSource 组成,

  • Stage
    Stage 即查询执行阶段。当 Presto 运行 Query 时,Presto 会将一个 Query 拆分成具有层级关系的多个 Stage,一个 Stage 就代表查询计划的一部分。

  • Exchange
    Presto 的 Stage 是通过 Exchange 来连接另一个 Stage 的,Exchange 用于完成有上下游关系的 Stage 之间的数据交换。

  • Task
    Stage 并不会在 Presto 集群中实际运行,仅代表针对于一个 SQL 语句查询执行计划中的一部分查询的执行过程,只是用来对查询执行计划进行管理和建模。Stage 在逻辑上又被分为一系列的 Task,这些 Task 则需要实际运行在 Presto 的各个 Worker 节点上。

  • Driver
    一个 Task 包含一个或多个 Driver。一个 Driver 其实就是作用于一个 Split 的一系列 Operator 的集合。因此一个 Driver 用于处理一个 Split,并且生成相应的输出,这些输出由 Task 收集并传送给下游 Stage 中的一个 Task。一个 Driver 拥有一个输入和一个输出。

  • Operator
    一个 Operator 代表一个 Split 的一种操作,例如过滤、加权、转换等。一个 Operator 依次读取一个 Split 中的数据,将 Operator 所代表的计算和操作作用于 Split 的数据上,并产生输出。每个 Operator 均会以 Page 为最小处理单位分别读取输入数据和产生输出数据。Operator 每次只会读取一个 Page 对象,相应地,每次也只会产生一个 Page 对象。

  • Split
    Split 即分片。一个分片是一个大的数据集中的一个小的子集。而 Driver 则是作用于一个分片上的一系列操作的集合,而每个节点上运行的 Task,又包含多个 Driver,从而一个 Task 可以处理多个 Split。

  • Page
    Page 是 Presto 中处理的最小数据单元。一个 Page 对象包含多个 Block 对象,每个 Block 对象是一个字节数组,存储一个字段的若干行。多个 Block 横切的一行是真实的一行数据。一个 Page 最大为 1MB ,最多 16 * 1024 行数据。

Presto 使用场景

单一的 SQL 分析访问点

作为一个消费者和分析师,你可能会遇到数不清的问题:

  • 有时你甚至不知道数据在哪儿,只有凭借公司某个部门的内部知识或者组织内多年的工作经验,你才能找到正确的数据。
  • 为了查询多个数据库,你需要使用不同的连接和运行多种 SQL 方言的不同查询。这些查询看起来相似,行为上却不同。
  • 若不使用数据仓库,就无法使用查询合并来自不同系统的数据。

可以使用 Presto 对接这些数据库,使用一个 SQL 标准来查询所有的系统。所有的仪表盘和分析工具以及其他商业智能系统都可以指向一个系统 – Presto,并访问组织当中的所有数据。

数据仓库和数据源系统的访问点

当一个组织需要更好的理解和分析存放在无数 RDBMS 中的数据时,就可以创建和维护数据仓库系统。从多个系统中抽取的数据通过一个复杂的 ETL 过程,最终进入一个严格受控的、巨大的数据仓库。

尽管数据仓库在很多情况下非常有用,但作为一个数据分析师,你会面临很多新问题:

  • 除了原来的那些数据库,你的工具和查询现在又多了一个数据接入点。
  • 你今天就要用的数据还没放入数据仓库。加载数据的过程痛苦、昂贵又困难重重重。

Presto 允许你添加任何数据仓库作为数据源,就像其他关系数据库一样。如果想深入研究数据仓库的查询,可以在 Presto 里直接完成,也可以在这里访问数据仓库及其源数据库系统,甚至可以编写将它们组合在一起查询。

提供对任何内容的 SQL 访问

Presto 允许将所有支持的系统作为数据源进行连接。它使用标准的 ANSI SQL 和使用 SQL 的所有工具对外暴露要查询的数据。

联邦查询

将所有的数据孤岛都暴露给 Presto 是向理解数据迈出的一大步。可以使用 SQL 和标准工具来联邦查询所有内容。 在一个语句中引用并使用不同数据库和模式的 SQL 查询,这些数据库和 Schema 来自于完全不同的系统。在同一条 SQL 查询中,可以查询 Presto 中可用的所有数据源。

虚拟数据仓库的语义层

数据仓库系统为用户创造了巨大的价值,对组织来说确实一个负担。

  • 运行和维护数据仓库是一个巨大且昂贵的项目。
  • 需要专门的团队运行与管理数据仓库和相关的 ETL 过程。
  • 将数据导入数据仓库需要用户执行繁琐的操作,并且通常非常耗时。

Presto 可用作虚拟仓库。使用这一工具和标准的 ANSI SQL ,就可以定义语义层。一旦所有的数据库都设置成 Presto 的数据源,就可以直接查询它们。Presto 提供了查询这些数据库所需的计算能力。使用 SQL 和 Presto 支持的函数和运算符,可以直接从数据源获得想要的数据。在使用数据进行分析之前,无需复制、移动或转换它们。

数据湖查询引擎

在数据被存储到数据湖的存储系统时,并没有特别考虑接下来应该如何访问它们,Presto 可以使它们成为有用的数据仓库。现代数据湖通常使用 HDFS 以外的其他对象存储系统,这些系统来自云供应商或其他开源项目。 Presto 能使用 Hive 连接器连接它们,无论数据在哪里、如何存储,都可以在数据湖上使用基于 SQL 的数据分析。

SQL 转换和 ETL

Presto 也可用于迁移数据,它所提供的丰富的 SQL 函数,可以查询数据,转换数据,并将数据写入同一个数据源或任何其他数据源。

更快的响应带来更好的数据见解

复杂的问题和海量数据集带来了诸多限制。将数据复制并加载到数据仓库并在其中分析它们的整个过程会过于昂贵。计算可能消耗太多的计算资源而无法处理全部数据,或者要消耗数天才能得到答案。

Presto 一开始就避免了数据复制。Presto 的并行计算和重度优化通常能为数据分析带来性能提升。

如果原来需要 3 天的查询现在只需要 15 分钟就可以完成,那么执行这个查询便是有价值的。从这些结果中获得的知识可以执行更多的查询。

安装和配置 Presto

编译

1
mvn -T2C install -DskipTests

服务端部署

  • 从编译后的源码中拷贝jar和配置文件
1
2
3
4
5
6
7
8
9
mkdir /usr/local/presto

cp -r ~workspace/presto/presto-server/target/presto-server-0.255-SNAPSHOT /usr/local/presto

cd /usr/local/presto/presto-server-0.255-SNAPSHOT

cp ~workspace/presto/presto-server/target/presto-main/etc ./presto-server-0.255-SNAPSHOT/

ln -s presto-server-0.255-SNAPSHOT server
  • 设置环境变量
1
2
3
4
5
6
vim /etc/profile

export PRESTO_SERVER_HOME=/usr/local/presto/server
export PATH=$PATH:$PRESTO_SERVER_HOME/bin

source /etc/profile
  • 修改 config.properties
1
2
3
4
5
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8086
discovery-server.enabled=true
discovery.uri=http://localhost:8086
  • 修改 node.properties
1
2
3
4
5
6
node.id=562e42e2-e874-431f-8da5-cb779744cf7c
node.data-dir=/usr/local/presto/data
catalog.config-dir=/usr/local/presto/server/etc/catalog
plugin.dir=/usr/local/presto/server/plugin
node.server-log-file=/usr/local/presto/server/var/log/server.log
node.launcher-log-file=/usr/local/presto/server/var/log/launcher.log
  • 修改 jvm.config
1
2
3
4
5
6
7
8
9
-server
-Xmx4G
-XX:-UseBiasedLocking
-XX:+UseG1GC
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+UseGCOverheadLimit
-XX:+ExitOnOutOfMemoryError
-XX:ReservedCodeCacheSize=512M
  • 修改 log.properties
1
com.facebook.presto=INFO
  • 后台启动
1
$PRESTO_SERVER_HOME/bin/launcher start
  • 前台启动
1
$PRESTO_SERVER_HOME/bin/launcher run

[Presto Web UI] http://localhost:8086/ui/)

Presto CLI

  • 从编译后的源码中拷贝jar
1
2
3
4
5
6
7
8
9
10
11
cd /usr/local/presto

mkdir -p cli/lib

cp ~workspace/presto/presto-cli/target/presto-cli-0.255-SNAPSHOT-executable.jar /usr/local/presto/cli/lib

cd /usr/local/presto/cli/lib

mv presto-cli-0.255-SNAPSHOT-executable.jar presto

chmod +x presto
  • 设置环境变量
1
2
3
4
5
6
vim /etc/profile

export PRESTO_CLI_HOME=/usr/local/presto/cli
export PATH=$PATH:$PRESTO_CLI_HOME/lib

source /etc/profile
  • 运行 cli 并查看其版本
1
2
presto --version
Presto CLI 0.255-SNAPSHOT-9095346
  • 启动 cli
1
presto --server localhost:8086
  • 额外诊断,打印调试信息
1
presto --debug
  • 执行查询
1
2
3
4
5
6
presto --server localhost:8086 --catalog tpch --schema sf1 --execute 'select nationkey,name,regionkey from nation limit 5'
"0","ALGERIA","0"
"1","ARGENTINA","1"
"2","BRAZIL","1"
"3","CANADA","1"
"4","EGYPT","4"

简单 SQL 语法

  • 查看 catalogs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
presto> show catalogs;
Catalog
------------
blackhole
druid
example
hive
jmx
localfile
memory
mysql
pinot
postgresql
raptor
sqlserver
system
tpcds
tpch
(15 rows)

Query 20210606_141818_00009_4qtix, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]
  • 查看 tpch Connector 的 schemas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
presto> show schemas from tpch;
Schema
--------------------
information_schema
sf1
sf100
sf1000
sf10000
sf100000
sf300
sf3000
sf30000
tiny
(10 rows)

Query 20210606_142008_00010_4qtix, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [10 rows, 119B] [141 rows/s, 1.65KB/s]
  • 查看 tpch.sf1 的 tables
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
presto> show tables from tpch.sf1;
Table
----------
customer
lineitem
nation
orders
part
partsupp
region
supplier
(8 rows)

Query 20210606_142111_00011_4qtix, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [8 rows, 158B] [85 rows/s, 1.66KB/s]
  • 查看 tpch.sf1.nation 表中的实际数据
1
2
3
4
5
6
7
8
9
presto> select count(name) from tpch.sf1.nation;
_col0
-------
25
(1 row)

Query 20210606_142214_00012_4qtix, FINISHED, 1 node
Splits: 21 total, 21 done (100.00%)
0:00 [25 rows, 0B] [358 rows/s, 0B/s]
  • 选择使用特定 schema
1
2
presto> use tpch.sf1;
USE

参考资料

《Presto实战》
《Presto技术内幕》