Test: Docker 拉起 MySqlContainer 唯一数据库实例

我们在进行大数据组件相关测试时,通常需要先在外部部署测试数据源,显得有些麻烦。在代码中拉起 docker container ,创建一个唯一干净的数据源环境,会方便许多。
本文以 MySql 数据源为例,说明下在单测中拉起 MySqlContainer ,创建唯一数据库实例,执行 sql 脚本等过程。

MySqlContainer

需引入以下 maven 包:

1
2
3
4
5
6
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
public class MySqlContainer extends JdbcDatabaseContainer {

public static final String IMAGE = "mysql";
public static final Integer MYSQL_PORT = 3306;

private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL";
private static final String MYSQL_ROOT_USER = "root";

private String databaseName = "test";
private String username = "test";
private String password = "test";

public MySqlContainer(MySqlVersion version) {
// 获取 docker 镜像
super(DockerImageName.parse(IMAGE + ":" + version.getVersion()));
// 设置容器暴露的端口
addExposedPort(MYSQL_PORT);
}

/**
* 使用 docker 镜像时,
* MYSQL_ROOT_PASSWORD
* MYSQL_ALLOW_EMPTY_PASSWORD
* MYSQL_RANDOM_ROOT_PASSWORD
* 三者中必须指定一个
*/
@Override
protected void configure() {
// MY_CNF -> docker/server-gtids/my.cnf
optionallyMapResourceParameterAsVolume(
MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/", "mysql-default-conf");

// SETUP_SQL -> docker/setup.sql
if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
optionallyMapResourceParameterAsVolume(
SETUP_SQL_PARAM_NAME, "/docker-entrypoint-initdb.d/", "N/A");
}

addEnv("MYSQL_DATABASE", databaseName);
addEnv("MYSQL_USER", username);
if (password != null && !password.isEmpty()) {
addEnv("MYSQL_PASSWORD", password);
addEnv("MYSQL_ROOT_PASSWORD", password);
} else if (MYSQL_ROOT_USER.equalsIgnoreCase(username)) {
addEnv("MYSQL_ALLOW_EMPTY_PASSWORD", "yes");
} else {
throw new ContainerLaunchException(
"Empty password can be used only with the root user");
}
setStartupAttempts(3);
}


@Override
public String getDriverClassName() {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
return "com.mysql.cj.jdbc.Driver";
} catch (ClassNotFoundException e) {
return "com.mysql.jdbc.Driver";
}
}

@Override
public String getJdbcUrl() {
return getJdbcUrl(databaseName);
}

public String getJdbcUrl(String databaseName) {
String additionalUrlParams = constructUrlParameters("?", "&");
return "jdbc:mysql://"
+ getHost()
+ ":"
+ getDatabasePort()
+ "/"
+ databaseName
+ additionalUrlParams;
}


@Override
public String getDatabaseName() {
return databaseName;
}

@Override
public String getUsername() {
return username;
}

@Override
public String getPassword() {
return password;
}

@Override
protected String getTestQueryString() {
return "SELECT 1";
}

public int getDatabasePort() {
return getMappedPort(MYSQL_PORT);
}

@Override
protected String constructUrlForConnection(String queryString) {
String url = super.constructUrlForConnection(queryString);

if (!url.contains("useSSL=")) {
String separator = url.contains("?") ? "&" : "?";
url = url + separator + "useSSL=false";
}

if (!url.contains("allowPublicKeyRetrieval=")) {
url = url + "&allowPublicKeyRetrieval=true";
}

return url;
}

public MySqlContainer withConfigurationOverride(String s) {
parameters.put(MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, s);
return this;
}

public MySqlContainer withSetupSQL(String sqlPath) {
parameters.put(SETUP_SQL_PARAM_NAME, sqlPath);
return this;
}

@Override
public MySqlContainer withDatabaseName(final String databaseName) {
this.databaseName = databaseName;
return this;
}

@Override
public MySqlContainer withUsername(final String username) {
this.username = username;
return this;
}

@Override
public MySqlContainer withPassword(final String password) {
this.password = password;
return this;
}
}

UniqueDatabase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class UniqueDatabase {

private static final String[] CREATE_DATABASE_DDL =
new String[]{"CREATE DATABASE $DBNAME$;", "USE $DBNAME$;"};
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");

private final MySqlContainer container;
private final String databaseName;
private final String templateName;
private final String username;
private final String password;

public UniqueDatabase(
MySqlContainer container,
String databaseName,
String username,
String password) {
this.container = container;
String identifier = Integer.toUnsignedString(new Random().nextInt(), 36);
this.databaseName = databaseName + "_" + identifier;
this.templateName = databaseName;
this.username = username;
this.password = password;
}

/**
* 执行ddl脚本
*/
public void createAndInitialize() {
final String ddlFile = String.format("ddl/%s.sql", templateName);
final URL ddlTestFile = UniqueDatabase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try {
try (Connection connection =
DriverManager.getConnection(
container.getJdbcUrl(), username, password);
Statement statement = connection.createStatement()) {
final List<String> statements =
Arrays.stream(
Stream.concat(
Arrays.stream(CREATE_DATABASE_DDL),
Files.readAllLines(
Paths.get(ddlTestFile.toURI()))
.stream())
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.map(this::convertSQL)
.collect(Collectors.joining("\n"))
.split(";"))
.map(x -> x.replace("$$", ";"))
.collect(Collectors.toList());
for (String stmt : statements) {
statement.execute(stmt);
}
}
} catch (final Exception e) {
throw new IllegalStateException(e);
}
}

public Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(container.getJdbcUrl(databaseName), username, password);
}

private String convertSQL(final String sql) {
return sql.replace("$DBNAME$", databaseName);
}

public String getHost() {
return container.getHost();
}

public int getDatabasePort() {
return container.getDatabasePort();
}

public String getDatabaseName() {
return databaseName;
}

public String getUsername() {
return username;
}

public String getPassword() {
return password;
}
}

MySqlVersion

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/** 
* MySql version enum.
*/
public enum MySqlVersion {
V5_5("5.5"),
V5_6("5.6"),
V5_7("5.7"),
V8_0("8.0");

private String version;

MySqlVersion(String version) {
this.version = version;
}

public String getVersion() {
return version;
}

@Override
public String toString() {
return "MySqlVersion{" + "version='" + version + '\'' + '}';
}
}

MySqlSourceTest

首先,安装并启动 docker 服务进程,再执行以下单测用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package com.ververica.cdc.connectors.mysql.matty;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

import java.io.UnsupportedEncodingException;
import java.sql.*;
import java.util.stream.Stream;

public class MySqlSourceTest {
private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceTest.class);

private final static String databaseNamePrefix = "customer";
private final static String tableName = "customers";
private final static String username = "mysqluser";
private final static String password = "mysqlpw";

private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7);

private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, databaseNamePrefix, username, password);

@Test
public void testDdlAndDml() throws Exception {

System.out.println("初始化数据库表,并插入21条数据:");
customerDatabase.createAndInitialize();
query();

System.out.println("操作数据库表并查询:");
dml();
query();
}

/**
* 创建 MySqlContainer
*
* @param version MySql 版本枚举
* @return MySqlContainer 实例
*/
private static MySqlContainer createMySqlContainer(MySqlVersion version) {
return (MySqlContainer)
new MySqlContainer(version)
.withConfigurationOverride("docker/server-gtids/my.cnf")
.withSetupSQL("docker/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withLogConsumer(new Slf4jLogConsumer(LOG));
}

/**
* 启动 MySqlContainer
*/
@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
// testcontainers 包中的启动类
Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
LOG.info("Containers are started.");
}

public void dml() throws SQLException {

Connection connection = customerDatabase.getJdbcConnection();
connection.setAutoCommit(false);

Statement statement = connection.createStatement();
statement.addBatch("UPDATE " + tableName + " SET address = 'Hangzhou' where id = 103");
statement.addBatch("DELETE FROM " + tableName + " where id = 102");
statement.addBatch("INSERT INTO " + tableName + " VALUES(102, 'user_2','Beijing','123567891234')");
statement.addBatch("UPDATE " + tableName + " SET address = 'Nanjing' where id = 103");
statement.executeBatch();

connection.commit();
connection.close();
}

public void query() throws Exception {
Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement();

ResultSet resultSet = statement.executeQuery(
String.format("SELECT * FROM %s", tableName));
JSONArray resultArray = resultSetToJson(resultSet);
System.out.println(resultArray);

connection.close();
}

private JSONArray resultSetToJson(ResultSet rs) throws SQLException, JSONException, UnsupportedEncodingException {

JSONArray array = new JSONArray();

ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
// 遍历ResultSet中的每条数据
while (rs.next()) {
JSONObject jsonObj = new JSONObject();
// 遍历每一列
for (int i = 1; i <= columnCount; i++) {
String value;
String columnName = metaData.getColumnLabel(i);
if (rs.getString(columnName) != null && !rs.getString(columnName).equals("")) {
value = new String(rs.getBytes(columnName), "UTF-8");
} else {
value = "";
}
jsonObj.put(columnName, value);
}
array.add(jsonObj);
}
rs.close();
return array;
}

}

执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
初始化数据库表,并插入21条数据:
[
{"address":"Shanghai","name":"user_1","phone_number":"123567891234","id":"101"},
{"address":"Shanghai","name":"user_2","phone_number":"123567891234","id":"102"},
{"address":"Shanghai","name":"user_3","phone_number":"123567891234","id":"103"},
{"address":"Shanghai","name":"user_4","phone_number":"123567891234","id":"109"},
{"address":"Shanghai","name":"user_5","phone_number":"123567891234","id":"110"},
{"address":"Shanghai","name":"user_6","phone_number":"123567891234","id":"111"},
{"address":"Shanghai","name":"user_7","phone_number":"123567891234","id":"118"},
{"address":"Shanghai","name":"user_8","phone_number":"123567891234","id":"121"},
{"address":"Shanghai","name":"user_9","phone_number":"123567891234","id":"123"},
{"address":"Shanghai","name":"user_10","phone_number":"123567891234","id":"1009"},
{"address":"Shanghai","name":"user_11","phone_number":"123567891234","id":"1010"},
{"address":"Shanghai","name":"user_12","phone_number":"123567891234","id":"1011"},
{"address":"Shanghai","name":"user_13","phone_number":"123567891234","id":"1012"},
{"address":"Shanghai","name":"user_14","phone_number":"123567891234","id":"1013"},
{"address":"Shanghai","name":"user_15","phone_number":"123567891234","id":"1014"},
{"address":"Shanghai","name":"user_16","phone_number":"123567891234","id":"1015"},
{"address":"Shanghai","name":"user_17","phone_number":"123567891234","id":"1016"},
{"address":"Shanghai","name":"user_18","phone_number":"123567891234","id":"1017"},
{"address":"Shanghai","name":"user_19","phone_number":"123567891234","id":"1018"},
{"address":"Shanghai","name":"user_20","phone_number":"123567891234","id":"1019"},
{"address":"Shanghai","name":"user_21","phone_number":"123567891234","id":"2000"}
]

操作数据库表并查询:
[
{"address":"Shanghai","name":"user_1","phone_number":"123567891234","id":"101"},
{"address":"Beijing","name":"user_2","phone_number":"123567891234","id":"102"},
{"address":"Nanjing","name":"user_3","phone_number":"123567891234","id":"103"},
{"address":"Shanghai","name":"user_4","phone_number":"123567891234","id":"109"},
{"address":"Shanghai","name":"user_5","phone_number":"123567891234","id":"110"},
{"address":"Shanghai","name":"user_6","phone_number":"123567891234","id":"111"},
{"address":"Shanghai","name":"user_7","phone_number":"123567891234","id":"118"},
{"address":"Shanghai","name":"user_8","phone_number":"123567891234","id":"121"},
{"address":"Shanghai","name":"user_9","phone_number":"123567891234","id":"123"},
{"address":"Shanghai","name":"user_10","phone_number":"123567891234","id":"1009"},
{"address":"Shanghai","name":"user_11","phone_number":"123567891234","id":"1010"},
{"address":"Shanghai","name":"user_12","phone_number":"123567891234","id":"1011"},
{"address":"Shanghai","name":"user_13","phone_number":"123567891234","id":"1012"},
{"address":"Shanghai","name":"user_14","phone_number":"123567891234","id":"1013"},
{"address":"Shanghai","name":"user_15","phone_number":"123567891234","id":"1014"},
{"address":"Shanghai","name":"user_16","phone_number":"123567891234","id":"1015"},
{"address":"Shanghai","name":"user_17","phone_number":"123567891234","id":"1016"},
{"address":"Shanghai","name":"user_18","phone_number":"123567891234","id":"1017"},
{"address":"Shanghai","name":"user_19","phone_number":"123567891234","id":"1018"},
{"address":"Shanghai","name":"user_20","phone_number":"123567891234","id":"1019"},
{"address":"Shanghai","name":"user_21","phone_number":"123567891234","id":"2000"}
]

以下文件需要作为 resource 加到测试类所在的 classpath 中。

  • my.cnf :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    [mysqld]

    skip-host-cache
    skip-name-resolve

    secure-file-priv=/var/lib/mysql
    user=mysql

    # Disabling symbolic-links is recommended to prevent assorted security risks
    symbolic-links=0

    # ----------------------------------------------
    # Enable the binlog for replication & CDC
    # ----------------------------------------------
    server-id = 223344
    log_bin = mysql-bin
    expire_logs_days = 1
    binlog_format = row

    # enable gtid mode
    gtid_mode = on
    enforce_gtid_consistency = on
  • setup.sql :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    -- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing) 
    -- 2) 'mysqluser' - all privileges
    --
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%';
    CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
    GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';

    -- ----------------------------------------------------------------------------------------------------------------
    -- DATABASE: emptydb
    -- ----------------------------------------------------------------------------------------------------------------
    CREATE DATABASE emptydb;
  • customer.sql

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    CREATE TABLE customers (
    id INTEGER NOT NULL PRIMARY KEY,
    name VARCHAR(255) NOT NULL DEFAULT 'flink',
    address VARCHAR(1024),
    phone_number VARCHAR(512)
    );

    INSERT INTO customers
    VALUES (101,"user_1","Shanghai","123567891234"),
    (102,"user_2","Shanghai","123567891234"),
    (103,"user_3","Shanghai","123567891234"),
    (109,"user_4","Shanghai","123567891234"),
    (110,"user_5","Shanghai","123567891234"),
    (111,"user_6","Shanghai","123567891234"),
    (118,"user_7","Shanghai","123567891234"),
    (121,"user_8","Shanghai","123567891234"),
    (123,"user_9","Shanghai","123567891234"),
    (1009,"user_10","Shanghai","123567891234"),
    (1010,"user_11","Shanghai","123567891234"),
    (1011,"user_12","Shanghai","123567891234"),
    (1012,"user_13","Shanghai","123567891234"),
    (1013,"user_14","Shanghai","123567891234"),
    (1014,"user_15","Shanghai","123567891234"),
    (1015,"user_16","Shanghai","123567891234"),
    (1016,"user_17","Shanghai","123567891234"),
    (1017,"user_18","Shanghai","123567891234"),
    (1018,"user_19","Shanghai","123567891234"),
    (1019,"user_20","Shanghai","123567891234"),
    (2000,"user_21","Shanghai","123567891234");

参考

flink-cdc-connector
com.ververica.cdc.connectors.mysql.testutils