MacOS M1 上,使用 Docker 构建 MySQL 和 Doris 上的 Streaming ETL,试用 Flink CDC 相关功能。
准备阶段
下载 Flink Doris Connector
Apache Doris 是一个现代化的 MPP 分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris 的分布式架构非常简洁,易于运维,并且可以支持 10PB 以上的超大数据集。
Apache Doris 可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。
Flink Doris Connector 是 Doris 社区为了方便用户使用 Flink 读写 Doris 数据表的一个扩展,目前 Doris 支持 Flink 1.11.x ,1.12.x ,1.13.x ; Scala 2.12.x 。
目前 Flink Doris Connector 控制写入有两个参数,这两个参数同时起作用,哪个条件先到就触发写入 Doris 表操作:
- sink.batch.size: 每多少条写入一次,默认100条;
- sink.batch.interval:批次间的写入间隔,默认1秒;
下载 Flink Doris Connector jar 包:
下载 Flink MySQL CDC Connector
下载 Flink MySQL CDC Connector jar 包: flink-connector-mysql-cdc-2.0.2.jar
安装 Flink
下载 Flink-1.13.3 安装包: flink-1.13.3-bin-scala_2.12.tgz
进入 Flink 安装目录:
1
cd $FLINK_HOME
将 Flink Doris Connector jar 包、Flink MySQL CDC Connector jar 包放入 Flink lib 目录下
启动 Flink 集群:
1
./bin/start-cluster.sh
启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI。
安装 MySQL
docker 仓库搜索 MySQL
1
docker search mysql
docker 仓库拉取 MySQL 8.0
1
docker pull mysql:8.0
查看本地仓库镜像是否拉取成功
1
docker images mysql:8.0
安装运行 MySQL 8.0
1
docker run -p 3307:3306 -name mysql8.0 -e MYSQL_ROOT_PASSWORD=root -d mysql:8.0
查看 MySQL 8.0 容器的运行情况
1
docker ps
docker 登录 MySQL 8.0
1
2docker exec -it mysql8.0 bash
mysql -uroot -p
安装 Doris
1 | docker run -it -v $MAVEN_HOME:/root/.m2 -v ~/Work/third/incubator-doris:/root/incubator-doris apache/incubator-doris:build-env-1.2 |
1 | cd /root/incubator-doris |
准备数据
创建 MySQL 表
1 | CREATE TABLE `mysql_source` ( |
创建 Doris 表
1 | CREATE TABLE `doris_sink` ( |
启动 Flink Sql Client
1 | ./bin/sql-client.sh embedded |
创建 Flink CDC Mysql 映射表
1 | CREATE TABLE mysql_source ( |
创建 Flink Doris Table 映射表
1 | CREATE TABLE doris_sink ( |
将 MySQL 里的数据插入到 Doris 中
将 MySQL 里的数据通过 Flink CDC 结合 Doris Flink Connector 方式插入到 Doris 中:
1
INSERT INTO doris_sink select id,name from test_flink_cdc;
向 MySQL 表中插入数据
1
2
3
4
5
6
7
8
9
10
11
12INSERT INTO mysql_source VALUES (123, 'this is a update');
INSERT INTO mysql_source VALUES (1212, '测试flink CDC');
INSERT INTO mysql_source VALUES (1234, '这是测试');
INSERT INTO mysql_source VALUES (11233, 'yunian_1');
INSERT INTO mysql_source VALUES (21233, 'yunian_2');
INSERT INTO mysql_source VALUES (31233, 'yunian_3');
INSERT INTO mysql_source VALUES (41233, 'yunian_4');
INSERT INTO mysql_source VALUES (51233, 'yunian_5');
INSERT INTO mysql_source VALUES (61233, 'yunian_6');
INSERT INTO mysql_source VALUES (71233, 'yunian_7');
INSERT INTO mysql_source VALUES (81233, 'yunian_8');
INSERT INTO mysql_source VALUES (91233, 'yunian_8');观察 Doris 表的数据
1
select * from doris_sink;
修改 MySQL 的数据
1
update mysql_source set name='这个是验证修改的操作' where id =123;
删除数据操作
目前 Flink Doris Connector 还不支持删除操作。
参考资料
Flink CDC 系列 - 实现 MySQL 数据实时写入 Apache Doris
Docker 安装 MySQL 8.0
Apache Doris 环境安装部署
flex从2.5.37升级至2.6.4
Apache Doris官网快速入门