Flink-CDC-实现 MySQL 数据实时写入 Apache Doris

MacOS M1 上,使用 Docker 构建 MySQL 和 Doris 上的 Streaming ETL,试用 Flink CDC 相关功能。

准备阶段

Apache Doris 是一个现代化的 MPP 分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris 的分布式架构非常简洁,易于运维,并且可以支持 10PB 以上的超大数据集。

Apache Doris 可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。

Flink Doris Connector 是 Doris 社区为了方便用户使用 Flink 读写 Doris 数据表的一个扩展,目前 Doris 支持 Flink 1.11.x ,1.12.x ,1.13.x ; Scala 2.12.x 。

目前 Flink Doris Connector 控制写入有两个参数,这两个参数同时起作用,哪个条件先到就触发写入 Doris 表操作:

  • sink.batch.size: 每多少条写入一次,默认100条;
  • sink.batch.interval:批次间的写入间隔,默认1秒;

下载 Flink Doris Connector jar 包: doris-flink-1.0-SNAPSHOT.jar

下载 Flink MySQL CDC Connector jar 包: flink-connector-mysql-cdc-2.0.2.jar

下载 Flink-1.13.3 安装包: flink-1.13.3-bin-scala_2.12.tgz

  1. 进入 Flink 安装目录:

    1
    cd $FLINK_HOME
  2. 将 Flink Doris Connector jar 包、Flink MySQL CDC Connector jar 包放入 Flink lib 目录下

  3. 启动 Flink 集群:

    1
    ./bin/start-cluster.sh

启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI。

安装 MySQL

  1. docker 仓库搜索 MySQL

    1
    docker search mysql
  2. docker 仓库拉取 MySQL 8.0

    1
    docker pull mysql:8.0
  3. 查看本地仓库镜像是否拉取成功

    1
    docker images mysql:8.0
  4. 安装运行 MySQL 8.0

    1
    docker run -p 3307:3306 -name mysql8.0 -e MYSQL_ROOT_PASSWORD=root -d mysql:8.0
  5. 查看 MySQL 8.0 容器的运行情况

    1
    docker ps
  6. docker 登录 MySQL 8.0

    1
    2
    docker exec -it mysql8.0 bash
    mysql -uroot -p

安装 Doris

1
docker run -it -v $MAVEN_HOME:/root/.m2 -v ~/Work/third/incubator-doris:/root/incubator-doris apache/incubator-doris:build-env-1.2
1
2
cd /root/incubator-doris
sh build.sh --fe --be --clean

准备数据

创建 MySQL 表

1
2
3
4
5
CREATE TABLE `mysql_source` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB

创建 Doris 表

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE `doris_sink` (
`id` int NULL COMMENT "",
`name` varchar(100) NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "V2"
);
1
2
./bin/sql-client.sh embedded
> set execution.result-mode=tableau;
1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE mysql_source ( 
id INT,
name STRING,
primary key(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'demo',
'table-name' = 'mysql_source'
);
1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE doris_sink (
id INT,
name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = 'localhost:8030',
'table.identifier' = 'db_audit.doris_test',
'sink.batch.size' = '2',
'sink.batch.interval'='1',
'username' = 'root',
'password' = ''
);

将 MySQL 里的数据插入到 Doris 中

  1. 将 MySQL 里的数据通过 Flink CDC 结合 Doris Flink Connector 方式插入到 Doris 中:

    1
    INSERT INTO doris_sink select id,name from test_flink_cdc;
  2. 向 MySQL 表中插入数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    INSERT INTO mysql_source VALUES (123, 'this is a update');
    INSERT INTO mysql_source VALUES (1212, '测试flink CDC');
    INSERT INTO mysql_source VALUES (1234, '这是测试');
    INSERT INTO mysql_source VALUES (11233, 'yunian_1');
    INSERT INTO mysql_source VALUES (21233, 'yunian_2');
    INSERT INTO mysql_source VALUES (31233, 'yunian_3');
    INSERT INTO mysql_source VALUES (41233, 'yunian_4');
    INSERT INTO mysql_source VALUES (51233, 'yunian_5');
    INSERT INTO mysql_source VALUES (61233, 'yunian_6');
    INSERT INTO mysql_source VALUES (71233, 'yunian_7');
    INSERT INTO mysql_source VALUES (81233, 'yunian_8');
    INSERT INTO mysql_source VALUES (91233, 'yunian_8');
  3. 观察 Doris 表的数据

    1
    select * from doris_sink;
  4. 修改 MySQL 的数据

    1
    update mysql_source set name='这个是验证修改的操作' where id =123;
  5. 删除数据操作
    目前 Flink Doris Connector 还不支持删除操作。

参考资料

Flink CDC 系列 - 实现 MySQL 数据实时写入 Apache Doris
Docker 安装 MySQL 8.0
Apache Doris 环境安装部署
flex从2.5.37升级至2.6.4
Apache Doris官网快速入门