MacOS M1 上,使用 Docker 构建 MySQL 和 Postgres 上的 Streaming ETL,试用 Flink CDC 相关功能。
准备阶段
准备 docker-compose.yml 文件
1 | version: '2.1' |
Docker Compose 中包含的容器有:
- MySQL:商品表 products 和 订单表 orders 将存储在该数据库中, 这两张表将和 Postgres 数据库中的物流表 shipments 进行关联,得到一张包含更多信息的订单表 enriched_orders;
- Postgres:物流表 shipments 将存储在该数据库中;
- Elasticsearch:最终的订单表 enriched_orders 将写到 Elasticsearch;
- Kibana:用来可视化 ElasticSearch 的数据。
Docker Compose 启动命令,后台启动:
1 | docker-compose up -d |
查看 Docker 进程,也可以通过访问 http://localhost:5601/ 来查看 Kibana 是否运行正常:
1 | docker ps |
查看某一个 Docker 进程的日志:
1 | docker logs docker_elasticsearch_1 |
Docker Compose 停止命令:
1 | docker-compose down |
注:Mac M1 请使用elasticsearch原生镜像,docker.elastic.co/elasticsearch/elasticsearch:7.10.2 ,否则启动不起来。
解决问题的链接
下载 Flink-1.13.2 和依赖的 jars
下载 Flink-1.13.2 安装包,也可以自行下载源码进行编译:
1
mvn clean install -DskipTests -Dfast -T 4 -Dmaven.compile.fork=true -Dscala-2.11
下载下面列出的依赖包,并将它们放到目录 flink-1.13.2/lib/ 下
准备数据
在 MySQL 数据库中准备数据
进入到 docker-compose.yml 所在目录,执行如下命令进入 MySQL 容器:
1
docker-compose exec mysql mysql -uroot -p123456
创建数据库和表 products,orders,并插入数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
在 Postgres 数据库中准备数据
进入到 docker-compose.yml 所在目录,执行如下命令进入 Postgre 容器:
1
docker-compose exec postgres psql -h localhost -U postgres
创建表 shipments,并插入3条物流数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14-- PG
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL
);
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
ALTER TABLE public.shipments REPLICA IDENTITY FULL;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
启动 Flink 集群和 Flink SQL CLI
进入 Flink 安装目录:
1
cd $FLINK_HOME
启动 Flink 集群:
1
./bin/start-cluster.sh
启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI。
- 启动 Flink SQL CLI
1
./bin/sql-client.sh
在 Flink SQL CLI 使用 Flink DDL 创建表
开启 checkpoint ,每隔3秒做一次 checkpoint
1
SET execution.checkpointing.interval = 3s;
对于数据库中的表 products, orders, shipments,使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'shipments'
);创建 Elasticsearch 表 enriched_orders ,用来将关联后的订单数据写入 Elasticsearch 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders'
);
关联订单数据并且将其写入 ES 中
使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中:
1 | INSERT INTO enriched_orders |
首先访问 创建 index pattern enriched_orders:
查看初始化写入的3条 enriched order 数据:
接下来,修改 MySQL 和 Postgre 数据库中的表数据,Kibana 中的订单数据也将实时更新:
- 在 MySQL 的 orders 表中插入一条数据:
1 | --MySQL |
- 在 Postgre 的 shipments 表中插入一条数据:
1 | --PG |
- 在 MySQL 的 orders 表中更新订单状态:
1 | --MySQL |
- 在 Postgre 的 shipments 表中更新物流状态:
1 | --PG |
- 在 MySQL 的 orders 表中删除一条数据:
1 | --MySQL |