Apache Iceberg

本文为 Apache Iceberg 的入门学习笔记。

Apache Iceberg 简介

官网定义:

Apache Iceberg is an open table format for huge analytic datasets. Iceberg delivers high query performance for tables with tens of petabytes of data, along with atomic commits, concurrent writes, and SQL-compatible table evolution.

  • 在文件 Format(Parquet/Avro/Orc)之上实现 table 语义
    支持定义和变更Schema
    支持 Hidden partition 和 Partition 变更
    ACID 语义
    历史版本回溯

  • 特点
    借助 partition 和 columns 统计信息实现分区裁剪
    不绑定 HDFS,可拓展到 S3/OSS 等
    容许多个 Writer 并发写入,乐观锁机制解决冲突

数据布局Layout

Layout 分为数据文件和元数据文件两部分。

  • 数据文件(data files)
    Iceberg 表真实存储数据的文件,一般存储在 data 目录下,以 “.parquet” 结尾。

  • 清单文件(manifest file)
    每行都是每个数据文件的详细描述,包括数据文件的状态、文件路径、分区信息、列级别的统计信息(比如每列的最大最小值、空值数等)。通过该文件,可以过滤掉无关数据,提高检索速度。

  • 快照(snapshot)
    快照代表一张表在某个时刻的状态。每个快照版本包含某个时刻的所有数据文件列表。data files 存储在不同的 manifest files 里, manifest files 存储在一个
    manifest list 文件里面,而一个 manifest list 文件代表一个快照。

读写原理

绿色表示数据文件,蓝色表示 manifest,黄色表示快照。一次 manifest 可以认为是一次 transaction 的写入,m0 对应写了两个文件,分别落到了 Partiiotn-0 、Partition-1,m1 对应也写了两个文件,分别落到了 Partiiotn-1 、Partition-2。对于 S1 来说,是由 m0 和 m1 两个 manifest 组成的。

基于 1 开始一次新的写入,写了一个 m2 的 transaction,写了一个数据文件 2 到 Partition-1,此时会新增一个 manifest m2 和 Snapshot S2,S2 会引用之前所有的 transaction m0 和 m1。

此时,如果要进行批量读取,就可以从 S1、S2、S3 中选择一个快照读取,比如选择了 S2,就是会把 m0、m1、m2 对应的数据全部 load 出来进行读取。

如果要进行增量读取,从 S0 开始把数据 load 出来,到 S1 的时候,会拿着 S1 的数据减去 S0 的数据得到增量的数据,然后交给计算引擎去做增量处理。

查询计划

查询计划是指在表中“查询所需文件”的过程。

  • 元数据过滤
    清单文件包括分区数据元组和每个数据文件的列级统计信息。在计划期内,查询谓词会自动转换为分区数据上的谓词,并首先应用于过滤数据文件。接下来,使用列级值计数,
    空计数,下限和上限来消除与查询谓词不匹配的文件。

  • snapshot id
    每个 snapshot id 会关联到一组 manifest files,而每一组 manifest files 包含很多 manifest file 。

  • manifest files 文件列表
    每个 manifest file 文件又记录了当前 data 数据块的元数据信息,其中就包含了文件列的最大值和最小值,然后根据这个元数据信息,索引到具体的文件块,从而更快的查询到数据。

入门实践

详细参考文档

本机启动 Hadoop 和 Hive

1
2
3
4
5
6
7
8
9
10
11
# 启动Hadoop
cd $HADOOP_HOME/sbin
./start-all.sh
hdfs dfs -ls /
drwx-wx-wx - miaowenting staff 0 2021-05-30 23:27 /tmp


# 启动Hive
cd $HIVE_HOME/bin
./hive --service metastore &
./hive --service hiveserver2 &
1
2
3
4
# 启动 Flink
cd $FLINK_HOME/bin

./start-cluster.sh
  • 配置 HADOOP_CLASSPATH 环境变量:
1
2
3
4
5
vim /etc/profile

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

source /etc/profile
  • 启动命令行:
1
2
3
4
5
6
cd $FLINK_HOME/bin

./sql-client.sh embedded \
-j /usr/local/external/iceberg-flink-runtime-0.10.0.jar \
-j /usr/local/external/flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar \
shell

创建 Iceberg 的 Catalog

创建 Hive Catalog
1
2
3
4
5
6
7
8
CREATE CATALOG hive_catalog WITH(
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://localhost:9000/user/hive/warehouse'
);

可能出现以下报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870)
at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: java.lang.VerifyError: Stack map does not match the one at exception handler 69
Exception Details:
Location:
org/apache/iceberg/hive/HiveCatalog.loadNamespaceMetadata(Lorg/apache/iceberg/catalog/Namespace;)Ljava/util/Map; @69: astore_2
Reason:
Type 'org/apache/hadoop/hive/metastore/api/NoSuchObjectException' (current frame, stack[0]) is not assignable to 'org/apache/thrift/TException' (stack map, stack[0])
Current Frame:
bci: @26
flags: { }
locals: { 'org/apache/iceberg/hive/HiveCatalog', 'org/apache/iceberg/catalog/Namespace' }
stack: { 'org/apache/hadoop/hive/metastore/api/NoSuchObjectException' }
Stackmap Frame:
bci: @69
flags: { }
locals: { 'org/apache/iceberg/hive/HiveCatalog', 'org/apache/iceberg/catalog/Namespace' }
stack: { 'org/apache/thrift/TException' }
Bytecode:
0x0000000: 2a2b b700 759a 0015 bb00 c759 12c9 04bd
0x0000010: 00cb 5903 2b53 b700 cebf 2ab4 0038 2bba
0x0000020: 0236 0000 b600 9ac0 0238 4d2a 2cb7 023c
0x0000030: 4eb2 00bd 1302 3e2b 2db9 0204 0100 b900
0x0000040: c504 002d b04d bb00 c759 2c12 c904 bd00
0x0000050: cb59 032b 53b7 0229 bf4d bb00 d059 bb00
0x0000060: d259 b700 d313 022b b600 d92b b600 dc13
0x0000070: 01a4 b600 d9b6 00e0 2cb7 00e3 bf4d b800
0x0000080: 40b6 00e6 bb00 d059 bb00 d259 b700 d313
0x0000090: 022d b600 d92b b600 dc13 01a4 b600 d9b6
0x00000a0: 00e0 2cb7 00e3 bf
Exception Handler Table:
bci [26, 68] => handler: 69
bci [26, 68] => handler: 69
bci [26, 68] => handler: 89
bci [26, 68] => handler: 125
Stackmap Table:
same_frame(@26)
same_locals_1_stack_item_frame(@69,Object[#111])
same_locals_1_stack_item_frame(@89,Object[#111])
same_locals_1_stack_item_frame(@125,Object[#113])

at org.apache.iceberg.flink.CatalogLoader$HiveCatalogLoader.loadCatalog(CatalogLoader.java:95)
at org.apache.iceberg.flink.FlinkCatalog.<init>(FlinkCatalog.java:104)
at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:132)
at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:122)
at org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:378)
at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$5(ExecutionContext.java:626)
at java.util.HashMap.forEach(HashMap.java:1289)
at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:625)
at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:264)
at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:624)
at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:183)
at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:136)
at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859)

解决办法:

  1. iceberg-flink-runtime-0.10.0.jar、flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar 不要放在 $FLINK_HOME/lib 目录下

  2. 打开 sql-client.sh 文件,在 jar 包启动的地方加上 -noverify 跳过字节码校验。

type:表示 flink-connector 的连接类型
catalog-type:表示创建 catalog 的类型。目前支持 hive 和 hadoop 两种类型
uri:Hive metastore 的 thrift URI 地址
clients:Hive metastore 客户端线程池大小,默认值为2
property-version:Connector 的属性值版本号,当前版本为1,这个值主要用来确保 iceberg connector 的 property 向后兼容

1
USE CATALOG hive_catalog;
创建 Hadoop Catalog
1
2
3
4
5
6
CREATE CATALOG hadoop_catalog WITH(
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://localhost:9000/warehouse/path',
'property-version'='1'
);
1
USE CATALOG hadoop_catalog;

warehouse:表示 iceberg 的 metadata 的 data 数据存放的文件路径。

创建 Custom Catalog
1
2
3
4
5
CREATE CATALOG my_catalog WITH (
'type'='iceberg',
'catalog-impl'='com.my.custom.CatalogImpl',
'my-additional-catalog-config'='my-value'
);
通过 YAML 统一新增 Catalog

修改 sql-client-defaults.yaml 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
catalogs:  # empty list
- name: hadoop_catalog
type: iceberg
catalog-type: hadoop
warehouse: hdfs://localhost:9000/warehouse/path
property-version: 1
- name: hive_catalog
type: iceberg
catalog-type: hive
uri: thrift://localhost:9083
clients: 5
warehouse: hdfs://localhost:9000/user/hive/warehouse
property-version: 1

创建数据库

1
2
3
CREATE DATABASE iceberg_db;

USE `iceberg_db`;

创建 Iceberg 表

  • 首先要进入到指定的 catalog 和 database 目录下
1
2
3
USE CATALOG hive_catalog;

USE iceberg_db;
  • 创建非分区表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
-- 创建 Iceberg 非分区表
CREATE TABLE sample_iceberg (
id BIGINT COMMENT 'unique id',
data STRING
);

-- 插入测试数据1
INSERT INTO sample_iceberg VALUES (1,'test1');

[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 79f18eee231895abd2ab15fbd0641ade

-- 插入测试数据2
INSERT INTO sample_iceberg VALUES (2,'test2');

[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: c0afa452656f42de5df6bc745fbaf022

-- 查询已插入的数据
SELECT * FROM sample_iceberg;

  • 创建分区表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- 创建 Iceberg 分区表
CREATE TABLE sample_iceberg_partition (
id BIGINT COMMENT 'unique id',
data STRING
)
PARTITIONED BY (data);

-- 插入测试数据1
INSERT INTO sample_iceberg_partition PARTITION(data='city') SELECT 86;

[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 9ff0581b9b16ee01c0c849c50608f1b2

-- 查询已插入的数据
SELECT * FROM sample_iceberg_partition;

  • 创建带 WITH 参数的表
1
2
3
4
5
6
7
8
9
10
11
12
13
14

-- 创建表
CREATE TABLE sample_iceberg_connector (
id BIGINT COMMENT 'unique id',
data STRING
) WITH (
'connector'='iceberg',
'catalog-type'='hive',
'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://localhost:9000/warehouse/path',
'write.format.default'='ORC'
);

支持 PARTITION BY 子句,用来设置分区字段
支持 COMMENT ‘table comment’ 子句
支持 WITH(‘key’=’value’,…) 子句,用来设置 table 级别的配置属性

暂不支持 computed column
暂不支持 primary key
暂不支持定义 watermark

  • 修改 flink-conf.yaml 配置文件,配置 checkpoint
    因为 flink 提交 Iceberg 的信息是在每次 checkpoint 时提交的,所以需要开启 checkpoint
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 开启 checkpoint
state.backend: filesystem
state.backend.async: true
state.backend.fs.memory-threshold: 1024
state.checkpoints.dir: hdfs://localhost:9000/flink-checkpoints
state.savepoints.dir: hdfs://localhost:9000/flink-checkpoints
state.backend.incremental: false
state.backend.local-recovery: false
# state.checkpoints.num-retained: 1
taskmanager.state.local.root-dirs: none

# checkpoint 间隔时间
execution.checkpointing.interval: 10s
# checkpoint 失败容忍次数
execution.checkpointing.tolerable-failed-checkpoints: 10
  • 首先要进入到指定的 catalog 和 database 目录下
1
2
3
USE CATALOG hive_catalog;

USE iceberg_db;
  • 创建一个 iceberg 的 sink connector
1
2
3
4
5
6
7
8
9
10
11
12
13
14

-- 创建表
CREATE TABLE table_sink_iceberg (
id BIGINT COMMENT 'unique id',
data STRING
) WITH (
'connector'='iceberg',
'catalog-type'='hive',
'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://localhost:9000/warehouse/path',
'write.format.default'='ORC'
);
  • 创建一个 datagen 的 source connector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE table_source_datagen (
userid INT,
f_random_str STRING
) WITH (
'connector'='datagen',
'rows-per-second'='5',
'fields.userid.kind'='random',
'fields.userid.min'='1',
'fields.userid.max'='1000',
'fields.f_random_str.length'='10'
);

-- 查询 source 数据,会有源源不断的数据输出
SELECT * FROM table_source_datagen;
  • 通过设置 execution.type 来切换提交流作业和批作业
1
2
-- 流式
SET execution.type = streaming;
1
2
-- 批
SET execution.type = batch;
  • 借助流式作业(默认)写入数据到 Apache Iceberg 表
1
2
3
4
5
INSERT INTO table_sink_iceberg SELECT userid,f_random_str FROM table_source_datagen;

[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 52c57f566aa6da61d46ec2fc73b74b59
  • 查询 sink 到 iceberg 表里的数据
1
SELECT * FROM table_sink_iceberg;

DataStream 写入

批量读取
1
2
3
4
5
6
7
8
9
10
11
12
13
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> batch = FlinkSource.forRowData()
.env(env)
.tableLoader(loader)
.streaming(false)
.build();

// Print all records to stdout.
batch.print();

// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");
流式读取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> stream = FlinkSource.forRowData()
.env(env)
.tableLoader(loader)
.streaming(true)
.startSnapshotId(3821550127947089987L)
.build();

// Print all records to stdout.
stream.print();

// Submit and execute this streaming read job.
env.execute("Test Iceberg streaming Read");
追加写入
1
2
3
4
5
6
7
8
9
10
11
StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);

FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.build();

env.execute("Test Iceberg DataStream");
覆盖写入
1
2
3
4
5
6
7
8
9
10
11
12
StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);

FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.overwrite(true)
.build();

env.execute("Test Iceberg DataStream");

修改表属性

1
2
3
ALTER TABLE sample SET ('write.format.default'='avro');

ALTER TABLE sample RENAME TO `hive_catalog.default.new_sample`;
  • Flink SQL 目前支持修改 iceberg 表的相关属性
  • Flink SQL 暂不支持添加列、修改列、删除列的操作,但可以通过 Iceberg Java API 来完成

删除表、库和 Catalog

1
2
3
4
5
6
7
8
9

-- 删除表
DROP TABLE sample

-- 删除库
DROP DATABASE test_db;

-- 删除 catalog
DROP CATALOG hive_catalog;

  • IcebergStreamWriter
    主要用来写入记录到对应的 avro、parquet、orc 文件,生成一个对应的 Iceberg datafile,并发送给下游算子。

  • IcebergFilesCommitter
    为每个 checkpointId 维护了一个 datafile 文件列表,即 Map<Long,List>,这样即使中间有某个 checkpoint 的 transaction 提交失败了,
    它的 datafile 文件仍然维护在 State 中,依然可以通过后续的 checkpoint 来提交数据到 Iceberg 表中。

Flink Sink State 改进:

  1. 多个不同的 Flink Job 写入同一个 Iceberg 表时,如何保证写入数据的正确性?

  2. 在云端环境下,metastore 所在的数据中心和日志数据中心是两个数据中心。这时候,可能导致 Commit Transaction 到 Iceberg 经常失败的现象,长期失败会导致 State 膨胀。
    如何解决这个问题。

  • 小数据量的合并
    在 IcebergFilesCommitter 之后添加一个 Compactor 算子,用来实现少量小文件的频繁合并。

  • 大数据量的合并
    设计 Flink Batch 作业,对接 Iceberg 的 RewriteDataFilesAction 来实现表内的大数据合并。

社区规划

  1. 预计将在下个 Apache Iceberg Release 中支持:
    a. Flink Sink 流式入湖和批量入湖
    b. Flink Streaming Reader
    c. Flink Batch Reader

  2. Flink + Iceberg 对小文件的处理
    a. Committer 任务之后添加 Compactor 算子,专门处理少量数据的 compaction
    b. 设计 Flink 批任务来处理大数据量的 compaction

  3. 对接 Iceberg 的 row-level delete 功能
    a. 通过 Flink 实现 CDC 日志的实时写入和分析
    b. 通过 Flink 实现 CDC 日志的增量拉取
    c. Flink + Iceberg 支持批量的数据更新

  4. 更加完善的 Flink SQL 支持
    a. 更富的 Flink DDL 支持,例如支持增删改 column
    b. 往 Flink 社区讨论支持 hidden partition

  5. 通过 SQL extension 来完成日常数据管理
    a. 在 Icebeg 内实现 compaction 语法和命令
    b. SQL 查看 history、snapshot、manifest、files 文件

参考资料

基于 Flink+Iceberg 构建企业级实时数据湖 - 文章
基于 Flink+Iceberg 构建企业级实时数据湖 - 视频
Apache Iceberg调研
Apache Iceberg 0.11.0 最新解读视频,胡争
Flink集成Iceberg简介
Flink+Iceberg 社区使用文档
Flink+iceberg 环境搭建及问题处理
Flink集成数据湖之实时数据写入 Iceberg
Flink + Iceberg 在去哪儿的实时数仓实践