Flink-CDC-构建 MySQL 和 Postgres 上的 Streaming ETL

MacOS M1 上,使用 Docker 构建 MySQL 和 Postgres 上的 Streaming ETL,试用 Flink CDC 相关功能。

准备阶段

准备 docker-compose.yml 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=1234
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.10.2
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
kibana:
image: elastic/kibana:7.10.2
ports:
- "5601:5601"

Docker Compose 中包含的容器有:

  • MySQL:商品表 products 和 订单表 orders 将存储在该数据库中, 这两张表将和 Postgres 数据库中的物流表 shipments 进行关联,得到一张包含更多信息的订单表 enriched_orders;
  • Postgres:物流表 shipments 将存储在该数据库中;
  • Elasticsearch:最终的订单表 enriched_orders 将写到 Elasticsearch;
  • Kibana:用来可视化 ElasticSearch 的数据。

Docker Compose 启动命令,后台启动:

1
docker-compose up -d

查看 Docker 进程,也可以通过访问 http://localhost:5601/ 来查看 Kibana 是否运行正常:

1
2
3
4
5
6
7
docker ps

CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
79e04d74474b elastic/kibana:7.10.2 "/usr/local/bin/dumb…" 32 minutes ago Up 32 minutes 0.0.0.0:5601->5601/tcp docker_kibana_1
f7020940f3bc debezium/example-mysql:1.1 "docker-entrypoint.s…" 32 minutes ago Up 32 minutes 0.0.0.0:3306->3306/tcp, 33060/tcp docker_mysql_1
823b10a437a2 docker.elastic.co/elasticsearch/elasticsearch:7.10.2 "/tini -- /usr/local…" 32 minutes ago Up 32 minutes 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp docker_elasticsearch_1
330a1dfd6571 debezium/example-postgres:1.1 "docker-entrypoint.s…" 32 minutes ago Up 32 minutes 0.0.0.0:5432->5432/tcp docker_postgres_1

查看某一个 Docker 进程的日志:

1
docker logs docker_elasticsearch_1

Docker Compose 停止命令:

1
docker-compose down

注:Mac M1 请使用elasticsearch原生镜像,docker.elastic.co/elasticsearch/elasticsearch:7.10.2 ,否则启动不起来。
解决问题的链接

  1. 下载 Flink-1.13.2 安装包,也可以自行下载源码进行编译:

    1
    mvn clean install -DskipTests -Dfast -T 4 -Dmaven.compile.fork=true -Dscala-2.11
  2. 下载下面列出的依赖包,并将它们放到目录 flink-1.13.2/lib/ 下

准备数据

在 MySQL 数据库中准备数据

  1. 进入到 docker-compose.yml 所在目录,执行如下命令进入 MySQL 容器:

    1
    docker-compose exec mysql mysql -uroot -p123456
  2. 创建数据库和表 products,orders,并插入数据:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    -- MySQL
    CREATE DATABASE mydb;
    USE mydb;
    CREATE TABLE products (
    id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    description VARCHAR(512)
    );
    ALTER TABLE products AUTO_INCREMENT = 101;

    INSERT INTO products
    VALUES (default,"scooter","Small 2-wheel scooter"),
    (default,"car battery","12V car battery"),
    (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
    (default,"hammer","12oz carpenter's hammer"),
    (default,"hammer","14oz carpenter's hammer"),
    (default,"hammer","16oz carpenter's hammer"),
    (default,"rocks","box of assorted rocks"),
    (default,"jacket","water resistent black wind breaker"),
    (default,"spare tire","24 inch spare tire");

    CREATE TABLE orders (
    order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
    order_date DATETIME NOT NULL,
    customer_name VARCHAR(255) NOT NULL,
    price DECIMAL(10, 5) NOT NULL,
    product_id INTEGER NOT NULL,
    order_status BOOLEAN NOT NULL -- Whether order has been placed
    ) AUTO_INCREMENT = 10001;

    INSERT INTO orders
    VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
    (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
    (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

在 Postgres 数据库中准备数据

  1. 进入到 docker-compose.yml 所在目录,执行如下命令进入 Postgre 容器:

    1
    docker-compose exec postgres psql -h localhost -U postgres
  2. 创建表 shipments,并插入3条物流数据:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    -- PG
    CREATE TABLE shipments (
    shipment_id SERIAL NOT NULL PRIMARY KEY,
    order_id SERIAL NOT NULL,
    origin VARCHAR(255) NOT NULL,
    destination VARCHAR(255) NOT NULL,
    is_arrived BOOLEAN NOT NULL
    );
    ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
    ALTER TABLE public.shipments REPLICA IDENTITY FULL;
    INSERT INTO shipments
    VALUES (default,10001,'Beijing','Shanghai',false),
    (default,10002,'Hangzhou','Shanghai',false),
    (default,10003,'Shanghai','Hangzhou',false);
  1. 进入 Flink 安装目录:

    1
    cd $FLINK_HOME
  2. 启动 Flink 集群:

    1
    ./bin/start-cluster.sh

启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI。

  1. 启动 Flink SQL CLI
    1
    ./bin/sql-client.sh
  1. 开启 checkpoint ,每隔3秒做一次 checkpoint

    1
    SET execution.checkpointing.interval = 3s;
  2. 对于数据库中的表 products, orders, shipments,使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    CREATE TABLE products (    
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'mydb',
    'table-name' = 'products'
    );

    CREATE TABLE orders (
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'mydb',
    'table-name' = 'orders'
    );


    CREATE TABLE shipments (
    shipment_id INT,
    order_id INT,
    origin STRING,
    destination STRING,
    is_arrived BOOLEAN,
    PRIMARY KEY (shipment_id) NOT ENFORCED
    ) WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'localhost',
    'port' = '5432',
    'username' = 'postgres',
    'password' = 'postgres',
    'database-name' = 'postgres',
    'schema-name' = 'public',
    'table-name' = 'shipments'
    );
  3. 创建 Elasticsearch 表 enriched_orders ,用来将关联后的订单数据写入 Elasticsearch 中:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    CREATE TABLE enriched_orders (   
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    product_name STRING,
    product_description STRING,
    shipment_id INT,
    origin STRING,
    destination STRING,
    is_arrived BOOLEAN,
    PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://localhost:9200',
    'index' = 'enriched_orders'
    );

关联订单数据并且将其写入 ES 中

使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
INSERT INTO enriched_orders 
SELECT
o.*,
p.name,
p.description,
s.shipment_id,
s.origin,
s.destination,
s.is_arrived
FROM orders AS o
LEFT JOIN
products AS p
ON o.product_id = p.id
LEFT JOIN shipments AS s
ON o.order_id = s.order_id;

首先访问 创建 index pattern enriched_orders:

查看初始化写入的3条 enriched order 数据:

接下来,修改 MySQL 和 Postgre 数据库中的表数据,Kibana 中的订单数据也将实时更新:

  1. 在 MySQL 的 orders 表中插入一条数据:
1
2
--MySQL
INSERT INTO orders VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

  1. 在 Postgre 的 shipments 表中插入一条数据:
1
2
--PG
INSERT INTO shipments VALUES (default,10004,'Shanghai','Beijing',false);

  1. 在 MySQL 的 orders 表中更新订单状态:
1
2
--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;

  1. 在 Postgre 的 shipments 表中更新物流状态:
1
2
--PG
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;

  1. 在 MySQL 的 orders 表中删除一条数据:
1
2
--MySQL
DELETE FROM orders WHERE order_id = 10004;

参考资料

Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL