我们在进行大数据组件相关测试时,通常需要先在外部部署测试数据源,显得有些麻烦。在代码中拉起 docker container ,创建一个唯一干净的数据源环境,会方便许多。
本文以 MySql 数据源为例,说明下在单测中拉起 MySqlContainer ,创建唯一数据库实例,执行 sql 脚本等过程。
MySqlContainer
需引入以下 maven 包:
1 | <dependency> |
1 | public class MySqlContainer extends JdbcDatabaseContainer { |
UniqueDatabase
1 | public class UniqueDatabase { |
MySqlVersion
1 | /** |
MySqlSourceTest
首先,安装并启动 docker 服务进程,再执行以下单测用例:
1 | package com.ververica.cdc.connectors.mysql.matty; |
执行结果如下:
1 | 初始化数据库表,并插入21条数据: |
以下文件需要作为 resource 加到测试类所在的 classpath 中。
my.cnf :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22[mysqld]
skip-host-cache
skip-name-resolve
secure-file-priv=/var/lib/mysql
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
# ----------------------------------------------
# Enable the binlog for replication & CDC
# ----------------------------------------------
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 1
binlog_format = row
# enable gtid mode
gtid_mode = on
enforce_gtid_consistency = onsetup.sql :
1
2
3
4
5
6
7
8
9
10
11-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing)
-- 2) 'mysqluser' - all privileges
--
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%';
CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: emptydb
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE emptydb;customer.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29CREATE TABLE customers (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(109,"user_4","Shanghai","123567891234"),
(110,"user_5","Shanghai","123567891234"),
(111,"user_6","Shanghai","123567891234"),
(118,"user_7","Shanghai","123567891234"),
(121,"user_8","Shanghai","123567891234"),
(123,"user_9","Shanghai","123567891234"),
(1009,"user_10","Shanghai","123567891234"),
(1010,"user_11","Shanghai","123567891234"),
(1011,"user_12","Shanghai","123567891234"),
(1012,"user_13","Shanghai","123567891234"),
(1013,"user_14","Shanghai","123567891234"),
(1014,"user_15","Shanghai","123567891234"),
(1015,"user_16","Shanghai","123567891234"),
(1016,"user_17","Shanghai","123567891234"),
(1017,"user_18","Shanghai","123567891234"),
(1018,"user_19","Shanghai","123567891234"),
(1019,"user_20","Shanghai","123567891234"),
(2000,"user_21","Shanghai","123567891234");
参考
flink-cdc-connectorcom.ververica.cdc.connectors.mysql.testutils