Flink-CDC-增量快照读取算法

Flink CDC 2.0 的设计方案,借鉴了 Netflix 的 DBLog paper 实现全程无锁,并基于 Flink FLIP-27 Source 实现水平扩展,并支持全量阶段 checkpoint 。

1.x 集成 Debezium

Flink CDC 1.x 是通过集成 Debezium 引擎来采集数据,支持全量 + 增量的模式,保证所有数据的一致性。存在以下痛点:

  1. 一致性通过加锁保证
    Debezium 在保证数据一致性时,需要对读取的库或表加锁,全局锁可能导致数据库 hang 住,表级锁会锁住表读
  2. 不支持水平扩展
    Debezium 为单机节点,在全量读取阶段,如果表非常大,读取时长可能达到小时级
  3. 全量读取阶段不支持 checkpoint
    CDC 读取分为两个阶段,全量和增量,全量读取阶段不支持 checkpoint,fail 之后则需要重新读取。

  • 全局锁
    首先是获取一个锁,然后去开启可重复读事务。锁住操作是读取 binlog 的起始位置和当前表的 schema 。这样做的目的是保证 binlog 的起始位置和读取到的当前 schema 可以对应上,因为表的 schema 是会变的,比如增加列或删除列。在读取这两个信息后, SnapshotReader 会在可重复读事务里读取全量数据,在全量数据读取完成之后,会启动 BinlogReader 从读取的 binlog 起始位置开始增量读取,从而保证全量数据 + 增量数据的无缝衔接。

  • 表锁
    表锁是全局锁的退化版,因为全局锁的权限比较高,在某些场景下用户只能得到表锁。表锁锁的时间会更长,表锁提前释放了可重复读的事务默认会提交,所以锁需要等到全量数据读完之后才能释放。

FLUSH TABLES WITH READ LOCK

  • 该命令等待所有正在进行的 update 完成,同时阻止所有新来的 update
  • 该命令执行成功前必须等到所有正在运行的 select 完成,更换的情况是,在等待正在运行 select 完成时,DB 实际上处于不可用状态,即使是新加入的 select 也会被阻止,这是 MySQL Query Cache 机制
  • 该命令阻止其他事务 commit

单 Chunk 读一致性

与 DBLog 不同,Flink CDC 2.0 没有维护额外的表,而是在 select 数据前后使用 SHOW MASTER STATUS 获取 binlog offset ,这种方式避免了侵入源端系统。

快照读取逻辑:

  1. SHOW MASTER STATUS 获取 lw,插入队列
  2. 读取该分片内的记录,插入记录
  3. SHOW MASTER STATUS 获取 hw,插入队列
  4. 判断 lw 和 hw 之间是否有增量变更
  5. 如果没有变更,队列中插入 BINLOG_END 记录
  6. 否则读取 [lw,hw] 之间的 binlog 并插入队列,最后一条记录为 BINLOG_END

修正队列里的数据,获取该分片 point-in-time 为 hw 的数据:

  1. 将 lw 加入到 normalizedBinlogRecords
  2. 遍历 binlogRecords 中的记录
  3. 对于删除记录,将其从 snapshotRecords 删除
  4. 对于更新记录,将记录中的 After 作为 READ 记录 map.put 到 snapshotRecords
  5. 对于创建记录,使用 map.put 到 snapshotRecords
  6. 将 snapshotRecords.values 加入到 normalizedBinlogRecords
  7. 将 hw 加入到 normalizedBinlogRecords

增量阶段

单 Chunk 读取是在多个 SourceReader 上并发执行,互不影响。假设一个任务同步的三张表 t1/t2/t3 被切分成 6 个分片,由于并发执行,其高低水位在 binlog 上的位置可能如下:

可以看出,t1.split1 和 t2.split1 读取的 binlog 范围有交叉,都读取了 [lw2.1,hw1.1] 之间的 binlog,t3.split2 可以早于 t3.split1 执行。这些交叉或者乱序并不影响正确性,因为全量阶段 MysqlSourceReader 会将每个 split 的 hw 汇报给 MysqlSourceEnumerator ,在增量阶段通过这些 hw 信息来保证 binlog 处理不丢失。

当 MysqlSourceEnumerator 把所有 split 的 hw 收集齐之后,会创建一个 binlog split , 该分片包含了需要读取 binlog 的起始位置(所有分片 hw 的最小值)和所有分片的 hw 信息。MysqlSourceEnumerator 把该 binlog 分片 assign 给一个 MysqlSourceReader ,任务从全量阶段转为增量阶段。

MysqlSourceReader 在读取 binlog 数据之后,通过以下条件判断记录是否应该发送给下游:

  1. 判断当前记录已经处于所在表的最大 hw ,即该表已经进入 Pure Binlog Phase ,对于这样的 binlog 记录,不需要进行比较,直接发送给下游
  2. 当一个 binlog 记录属于一个分片的主键范围时,如果该记录在这个分片的 hw 之后,则该记录应该发送给下游

具体代码实现

MySql cdc 类图关系如下:

MySqlSourceReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public class MySqlSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<
SourceRecord, T, MySqlSplit, MySqlSplitState> {

/**
* MySqlSource 配置类,
*/
private final MySqlSourceConfig sourceConfig;
/*
* 存放结束未确认的 snapshot splits
*/
private final Map<String, MySqlSnapshotSplit> finishedUnackedSplits;
/**
* 存放未完成的 binlog splits
*/
private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits;
/**
* tm 中对应的 sub task id
*/
private final int subtaskId;

/*
* 启动 SourceReader
*/
@Override
public void start() {
if (getNumberOfCurrentlyAssignedSplits() == 0) {

// 当前状态后端里存储的 Splits 为空,则通过 SourceOperator 中的方法向 SourceCoordinator 的 SplitEnumerator 组件发送 RequestSplitEvent 事件
context.sendSplitRequest();
}
}

/**
* 查询当前状态后端里存储的 Splits
*/
public int getNumberOfCurrentlyAssignedSplits() {
return this.splitStates.size();
}

/*
* addSplits -> initializedState -> super.addSplits ->
* createSplitFetcher -> fetcher.addSplits() 加入任务双端队列中 -> fetcher.start() 提交 fetcher 线程到线程池
*/
@Override
public void addSplits(List<MySqlSplit> splits) {
// restore for finishedUnackedSplits, 循环处理,查找出还没有完成处理的 splits
List<MySqlSplit> unfinishedSplits = new ArrayList<>();
for (MySqlSplit split : splits) {
if (split.isSnapshotSplit()) {
MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
if (snapshotSplit.isSnapshotReadFinished()) {
finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
} else {
unfinishedSplits.add(split);
}
} else {
// the binlog split is uncompleted
if (!split.asBinlogSplit().isCompletedSplit()) {
uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit());
requestBinlogSplitMetaIfNeeded(split.asBinlogSplit());
} else {
uncompletedBinlogSplits.remove(split.splitId());
MySqlBinlogSplit mySqlBinlogSplit =
discoverTableSchemasForBinlogSplit(split.asBinlogSplit());
unfinishedSplits.add(mySqlBinlogSplit);
}
}
}
// notify split enumerator again about the finished unacked snapshot splits
reportFinishedSnapshotSplitsIfNeed();
// add all un-finished splits (including binlog split) to SourceReaderBase
super.addSplits(unfinishedSplits);
}


/*
* 构建 MySqlSplit 对应的 MySqlSplitState
*/
@Override
protected MySqlSplitState initializedState(MySqlSplit split) {
if (split.isSnapshotSplit()) {
return new MySqlSnapshotSplitState(split.asSnapshotSplit());
} else {
return new MySqlBinlogSplitState(split.asBinlogSplit());
}
}

/*
* 每次做 checkpoint 时,记录 MySqlSplit 列表状态后端
*/
@Override
public List<MySqlSplit> snapshotState(long checkpointId) {
// unfinished splits
List<MySqlSplit> stateSplits = super.snapshotState(checkpointId);

// add finished snapshot splits that didn't receive ack yet
stateSplits.addAll(finishedUnackedSplits.values());

// add binlog splits who are uncompleted
stateSplits.addAll(uncompletedBinlogSplits.values());

return stateSplits;
}
}

SourceReaderBase:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> implements SourceReader<T, SplitT> {


/**
* 父类 SourceReaderBase 中的拉取数据方法
*/
public InputStatus pollNext(ReaderOutput<T> output) throws Exception {

RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
if (recordsWithSplitId == null) {
recordsWithSplitId = this.getNextFetch(output);
if (recordsWithSplitId == null) {
return this.trace(this.finishedOrAvailableLater());
}
}

do {
// 从 split 中读取出 record
E record = recordsWithSplitId.nextRecordFromSplit();
if (record != null) {
// 通过 MySqlRecordEmitter 将数据传递到外部 SourceOutput ,从 output.collect 中获取 SourceRecord,写入下游存储系统
this.recordEmitter.emitRecord(record, this.currentSplitOutput, this.currentSplitContext.state);
LOG.trace("Emitted record: {}", record);
return this.trace(InputStatus.MORE_AVAILABLE);
}
} while(this.moveToNextSplit(recordsWithSplitId, output));

return this.pollNext(output);

}

/**
* 父类 SourceReaderBase 中 addSplits 方法
*/
public void addSplits(List<SplitT> splits) {
LOG.info("Adding split(s) to reader: {}", splits);
splits.forEach((s) -> {

// 记录每一个 splitId 对应的 SplitState ,调用子类的 initializedState(s) 构建 SplitState ,存储在 splitStates map 中
SourceReaderBase.SplitContext var10000 = (SourceReaderBase.SplitContext)this.splitStates.put(s.splitId(), new SourceReaderBase.SplitContext(s.splitId(), this.initializedState(s)));
});
this.splitFetcherManager.addSplits(splits);
}


/*
* 关闭 reader 操作,即关掉 splitFetcherManager 中线程池管理的 SplitFetcher 线程及关闭该线程池
*/
public void close() throws Exception {
LOG.info("Closing Source Reader.");
this.splitFetcherManager.close(this.options.sourceReaderCloseTimeout);
}

}

MySqlSourceReaderTest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
public class MySqlSourceReaderTest extends MySqlSourceTestBase {
// 为每次运行JUnit测试创建并填充MySQL数据库的唯一实例
private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");

/*
* 测试binlog读取故障恢复流程
*/
@Test
public void testBinlogReadFailoverCrossTransaction() throws Exception {
// 创建数据库,并执行 sql 脚本
customerDatabase.createAndInitialize();
// MySqlSource 相关配置项,读表 customers
final MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"});
//
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
MySqlSplit binlogSplit;
try (MySqlConnection jdbc =
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
Map<TableId, TableChanges.TableChange> tableSchemas =
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
// 创建一个 MySqlBinlogSplit ,`SHOW MASTER STATUS` 查询出 binlog offset ,并设置 tableSchema
binlogSplit =
MySqlBinlogSplit.fillTableSchemas(
createBinlogSplit(sourceConfig).asBinlogSplit(), tableSchemas);
}

// 创建 MysqlSourceReader ,分别创建 elementsQueue、MySqlRecordEmitter 、MySqlSplitReader
MySqlSourceReader<SourceRecord> reader = createReader(sourceConfig);
reader.start();
// 模拟分配 MySqlBinlogSplit 给 MysqlSourceReader -> SplitFetcherManager -> MySqlSplitReader
reader.addSplits(Arrays.asList(binlogSplit));

// step-1: make 6 change events in one MySQL transaction
TableId tableId = binlogSplit.getTableSchemas().keySet().iterator().next();
makeBinlogEventsInOneTransaction(sourceConfig, tableId.toString());

// step-2: fetch the first 2 records belong to the MySQL transaction
String[] expectedRecords =
new String[] {
"-U[103, user_3, Shanghai, 123567891234]",
"+U[103, user_3, Hangzhou, 123567891234]"
};
// the 2 records are produced by 1 operations
List<String> actualRecords = consumeRecords(reader, dataType, 1);

assertEqualsInOrder(Arrays.asList(expectedRecords), actualRecords);

// 存储 binlog split state ,将 MySqlBinlogSplit 整体序列化写入 state
List<MySqlSplit> splitsState = reader.snapshotState(1L);
// check the binlog split state
assertEquals(1, splitsState.size());
// 关闭 reader
reader.close();

// step-3: mock failover from a restored state
MySqlSourceReader<SourceRecord> restartReader = createReader(sourceConfig);
restartReader.start();
// 模拟从 splitsState 恢复,读取存储的 split
restartReader.addSplits(splitsState);

// step-4: fetch the rest 4 records belong to the MySQL transaction
String[] expectedRestRecords =
new String[] {
"-D[102, user_2, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]"
};
// the 4 records are produced by 3 operations
List<String> restRecords = consumeRecords(restartReader, dataType, 3);

assertEqualsInOrder(Arrays.asList(expectedRestRecords), restRecords);
restartReader.close();
}


/**
* 创建一个 binlog split
*/
private MySqlSplit createBinlogSplit(MySqlSourceConfig sourceConfig) {
MySqlBinlogSplitAssigner binlogSplitAssigner = new MySqlBinlogSplitAssigner(sourceConfig);
binlogSplitAssigner.open();
return binlogSplitAssigner.getNext().get();
}

/*
* 创建 MysqlSourceReader ,分别创建阻塞队列、MySqlRecordEmitter、MySqlSplitReader
*/
private MySqlSourceReader<SourceRecord> createReader(
MySqlSourceConfig configuration, SourceReaderContext readerContext) {
final FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
new FutureCompletingBlockingQueue<>();
final MySqlRecordEmitter<SourceRecord> recordEmitter =
new MySqlRecordEmitter<>(
new ForwardDeserializeSchema(),
new MySqlSourceReaderMetrics(readerContext.metricGroup()),
configuration.isIncludeSchemaChanges());
return new MySqlSourceReader<>(
elementsQueue,
() -> createSplitReader(configuration),
recordEmitter,
readerContext.getConfiguration(),
readerContext,
configuration);
}

/**
* 在一个事务中,进行4笔数据库操作
*/
private void makeBinlogEventsInOneTransaction(MySqlSourceConfig sourceConfig, String tableId)
throws SQLException {
JdbcConnection connection = DebeziumUtils.openJdbcConnection(sourceConfig);
// make 6 binlog events by 4 operations
connection.setAutoCommit(false);
connection.execute(
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103",
"DELETE FROM " + tableId + " where id = 102",
"INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')",
"UPDATE " + tableId + " SET address = 'Shanghai' where id = 103");
connection.commit();
connection.close();
}

/**
* 从 elementQueues 中拉取下一批数据
*/
private List<String> consumeRecords(
MySqlSourceReader<SourceRecord> sourceReader, DataType recordType, int changeEventNum)
throws Exception {
// Poll all the n records of the single split.
// 创建 ReaderOutput
final SimpleReaderOutput output = new SimpleReaderOutput();

// ReaderOutput 的取出结果小于指定个数,则继续执行 sourceReader.pollNext()
while (output.getResults().size() < changeEventNum) {
sourceReader.pollNext(output);
}
final RecordsFormatter formatter = new RecordsFormatter(recordType);
return formatter.format(output.getResults());
}

}

测试类中执行的 sql 脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
CREATE TABLE customers (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);

INSERT INTO customers
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(109,"user_4","Shanghai","123567891234"),
(110,"user_5","Shanghai","123567891234"),
(111,"user_6","Shanghai","123567891234"),
(118,"user_7","Shanghai","123567891234"),
(121,"user_8","Shanghai","123567891234"),
(123,"user_9","Shanghai","123567891234"),
(1009,"user_10","Shanghai","123567891234"),
(1010,"user_11","Shanghai","123567891234"),
(1011,"user_12","Shanghai","123567891234"),
(1012,"user_13","Shanghai","123567891234"),
(1013,"user_14","Shanghai","123567891234"),
(1014,"user_15","Shanghai","123567891234"),
(1015,"user_16","Shanghai","123567891234"),
(1016,"user_17","Shanghai","123567891234"),
(1017,"user_18","Shanghai","123567891234"),
(1018,"user_19","Shanghai","123567891234"),
(1019,"user_20","Shanghai","123567891234"),
(2000,"user_21","Shanghai","123567891234");

下面来监测一下数据变化:

  1. 创建的 MySqlBinlogSplit 格式如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    {
    "splitId":"binlog-split",
    "binlogSplit":true,
    "completedSplit":true, // totalFinishedSplitSize == finishedSnapshotSplitInfos
    "finishedSnapshotSplitInfos":[

    ],
    "snapshotSplit":false,
    "startingOffset":{
    "filename":"mysql-bin.000003",
    "gtidSet":"fb8daa95-9602-11ec-9adb-0242ac110003:1-35",
    "offset":{
    "ts_sec":"0",
    "file":"mysql-bin.000003",
    "pos":"11790",
    "gtids":"fb8daa95-9602-11ec-9adb-0242ac110003:1-35",
    "row":"0",
    "event":"0"
    },
    "position":11790,
    "restartSkipEvents":0,
    "restartSkipRows":0,
    "serverId":0,
    "timestamp":0
    },
    "endingOffset":{
    "filename":"",
    "offset":{
    "ts_sec":"0",
    "file":"",
    "pos":"-9223372036854775808",
    "row":"0",
    "event":"0"
    },
    "position":-9223372036854775808,
    "restartSkipEvents":0,
    "restartSkipRows":0,
    "serverId":0,
    "timestamp":0
    },
    "totalFinishedSplitSize":0
    }
  2. 读取的 SourceRecord 格式如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    SourceRecord{
    sourcePartition={server=mysql_binlog_source},
    sourceOffset={transaction_id=null, ts_sec=1645771455, file=mysql-bin.000003, pos=11855, gtids=a5adbd99-9603-11ec-a0a1-0242ac110003:1-35, row=1, server_id=223344, event=2}
    }
    ConnectRecord{
    topic='mysql_binlog_source.customer_va47fu.customers',
    kafkaPartition=null,
    key=Struct{id=103},
    keySchema=Schema{mysql_binlog_source.customer_va47fu.customers.Key:STRUCT},
    value=Struct{
    before=Struct{id=103,name=user_3,address=Shanghai,phone_number=123567891234},
    after=Struct{id=103,name=user_3,address=Hangzhou,phone_number=123567891234},
    source=Struct{
    version=1.5.4.Final,
    connector=mysql,
    name=mysql_binlog_source,
    ts_ms=1645771455000,
    db=customer_va47fu,
    table=customers,
    server_id=223344,
    gtid=a5adbd99-9603-11ec-a0a1-0242ac110003:36,
    file=mysql-bin.000003,pos=11995,
    row=0
    },
    op=u,
    ts_ms=1645771666855
    },
    valueSchema=Schema{mysql_binlog_source.customer_va47fu.customers.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  3. 读取一个事件数据之后,MySqlBinlogSplit 数据变化如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    {
    "binlogSplit":true,
    "completedSplit":true,
    "startingOffset":{
    "filename":"mysql-bin.000003",
    "gtidSet":"a5adbd99-9603-11ec-a0a1-0242ac110003:1-35",
    "offset":{
    "ts_sec":"1645771455",
    "file":"mysql-bin.000003",
    "pos":"11855", // 起始位置由原来的 11790 变成了 11855
    "gtids":"a5adbd99-9603-11ec-a0a1-0242ac110003:1-35",
    "row":"1",
    "server_id":"223344",
    "event":"2"
    },
    "position":11855,
    "restartSkipEvents":2,
    "restartSkipRows":1,
    "serverId":223344,
    "timestamp":1645771455
    },
    "endingOffset":{
    "filename":"",
    "offset":{
    "ts_sec":"0",
    "file":"",
    "pos":"-9223372036854775808",
    "row":"0",
    "event":"0"
    },
    "position":-9223372036854775808,
    "restartSkipEvents":0,
    "restartSkipRows":0,
    "serverId":0,
    "timestamp":0
    },
    "finishedSnapshotSplitInfos":[

    ],
    "snapshotSplit":false,
    "totalFinishedSplitSize":0
    }

MysqlSplitReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {

/**
* 被分配到的 split
*/
private final Queue<MySqlSplit> splits;
private final MySqlSourceConfig sourceConfig;
private final int subtaskId;

/**
* 根据当前处理的 MySqlSplit 类型确定 Reader 类型:
* MySqlBinlogSplit -> SnapshotSplitReader
* MySqlSnapshotSplit -> BinlogSplitReader
*
* 这两个类都实现了 DebeziumReader ,复用 1.x 中的 reader 能力
*/
@Nullable private DebeziumReader<SourceRecord, MySqlSplit> currentReader;
@Nullable private String currentSplitId;

public MySqlSplitReader(MySqlSourceConfig sourceConfig, int subtaskId) {
this.sourceConfig = sourceConfig;
this.subtaskId = subtaskId;
this.splits = new ArrayDeque<>();
}

@Override
public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {

// 根据不同的 MySqlSplit 类型创建不同的 DebeziumReader
checkSplitOrStartNext();
Iterator<SourceRecord> dataIt = null;
try {
dataIt = currentReader.pollSplitRecords();
} catch (InterruptedException e) {
LOG.warn("fetch data failed.", e);
throw new IOException(e);
}
return dataIt == null
? finishedSnapshotSplit()
: MySqlRecords.forRecords(currentSplitId, dataIt);
}
}

BinlogSplitReaderTest

先 SnapshotSplitReader ,再 BinlogSplitReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
public class BinlogSplitReaderTest extends MySqlSourceTestBase {

// 创建一个单独的数据库实例
private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");

private BinaryLogClient binaryLogClient;
private MySqlConnection mySqlConnection;

@Test
public void testReadSingleBinlogSplit() throws Exception {

// 初始化创建数据库,并执行 sql 脚本
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));

/*
* 根据当前 customers_even_dist 表中记录,获取 MySqlSnapshotSplit 列表
* 1. (null,105)
* 2. (105,109)
* 3. (109,null)
*/
List<MySqlSnapshotSplit> splits =
getMySqlSplits(new String[] {"customers_even_dist"}, sourceConfig);

String[] expected =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-D[102, user_2, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-U[103, user_3, Shanghai, 123567891234]",
"+U[103, user_3, Hangzhou, 123567891234]",
"-U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]",
"+I[104, user_4, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]"
};

List<String> actual =
readBinlogSplits(
splits,
dataType,
sourceConfig,
// scanSplitsNum 设置为1,因此只处理第一个 split 区间 (null,105)
1,
expected.length,
splits.get(splits.size() - 1).getTableId());

// In any order
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}


/**
* 根据当前 customers_even_dist 表中记录,获取 MySqlSnapshotSplit 列表
*/
private List<MySqlSnapshotSplit> getMySqlSplits(
String[] captureTables, MySqlSourceConfig sourceConfig) {
List<String> captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
.collect(Collectors.toList());
List<TableId> remainingTables =
captureTableIds.stream().map(TableId::parse).collect(Collectors.toList());

final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
sourceConfig, DEFAULT_PARALLELISM, remainingTables, false);
assigner.open();
List<MySqlSnapshotSplit> mySqlSplits = new ArrayList<>();
while (true) {
Optional<MySqlSplit> mySqlSplit = assigner.getNext();
if (mySqlSplit.isPresent()) {
mySqlSplits.add(mySqlSplit.get().asSnapshotSplit());
} else {
break;
}
}
assigner.close();
return mySqlSplits;
}


/**
* 使用 SplitReader 读取分片数据
*/
private List<String> readBinlogSplits(
List<MySqlSnapshotSplit> sqlSplits,
DataType dataType,
MySqlSourceConfig sourceConfig,
int scanSplitsNum,
int expectedSize,
TableId binlogChangeTableId)
throws Exception {
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
final SnapshotSplitReader snapshotSplitReader =
new SnapshotSplitReader(statefulTaskContext, 0);

// step-1: read snapshot splits firstly
// 先读取 snapshot split 分片中的数据
List<SourceRecord> fetchedRecords = new ArrayList<>();
for (int i = 0; i < scanSplitsNum; i++) {
MySqlSplit sqlSplit = sqlSplits.get(i);
if (snapshotSplitReader.isFinished()) {
snapshotSplitReader.submitSplit(sqlSplit);
}
Iterator<SourceRecord> res;
while ((res = snapshotSplitReader.pollSplitRecords()) != null) {
while (res.hasNext()) {
SourceRecord sourceRecord = res.next();
fetchedRecords.add(sourceRecord);
}
}
}
/*
* 读取数据结果如下,包括lw、hw、4条分区数据:
*
* Struct{split_id=customer_1bioqsl.customers_even_dist:0,watermark_kind=LOW}
*
* Struct{after=Struct{id=101,name=user_1,address=Shanghai,phone_number=123567891234},source=Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,db=customer_1bioqsl,table=customers_even_dist,server_id=0,file=,pos=0,row=0},op=r,ts_ms=1645779411405}
*
* Struct{after=Struct{id=104,name=user_4,address=Shanghai,phone_number=123567891234},source=Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,db=customer_1bioqsl,table=customers_even_dist,server_id=0,file=,pos=0,row=0},op=r,ts_ms=1645779411412}
*
* Struct{after=Struct{id=102,name=user_2,address=Shanghai,phone_number=123567891234},source=Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,db=customer_1bioqsl,table=customers_even_dist,server_id=0,file=,pos=0,row=0},op=r,ts_ms=1645779411411}
*
* Struct{after=Struct{id=103,name=user_3,address=Shanghai,phone_number=123567891234},source=Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,db=customer_1bioqsl,table=customers_even_dist,server_id=0,file=,pos=0,row=0},op=r,ts_ms=1645779411412}
*
* Struct{split_id=customer_1bioqsl.customers_even_dist:0,watermark_kind=HIGH}
*
*/


// step-2: create binlog split according the finished snapshot splits
// 获取已读取完成的 snapshot split 相关信息,FinishedSnapshotSplitInfo.highwatermark.offset 为 11837
List<FinishedSnapshotSplitInfo> finishedSplitsInfo =
getFinishedSplitsInfo(sqlSplits, fetchedRecords);

// 获取已完成的 snapshot split 的最大 hw offset ,作为 binlog split 的起点
BinlogOffset startingOffset = getStartingOffsetOfBinlogSplit(finishedSplitsInfo);
Map<TableId, TableChange> tableSchemas = new HashMap<>();
for (MySqlSplit mySqlSplit : sqlSplits) {
tableSchemas.putAll(mySqlSplit.getTableSchemas());
}

// 创建 binlog split
MySqlSplit binlogSplit =
new MySqlBinlogSplit(
"binlog-split",
startingOffset,
// end offset 设置为 Long 的最大值
BinlogOffset.NO_STOPPING_OFFSET,
finishedSplitsInfo,
tableSchemas,
finishedSplitsInfo.size());

// step-3: test read binlog split
BinlogSplitReader binlogReader = new BinlogSplitReader(statefulTaskContext, 0);
binlogReader.submitSplit(binlogSplit);

// step-4: make some binlog events
/*
* "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103"
* "DELETE FROM " + tableId + " where id = 102"
* "INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')"
* "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103"
*
* Struct {before 103、after 103}
* Struct {before 102}
* Struct {after 102}
* Struct {before 103、after 103}
*/
if (binlogChangeTableId.table().contains("customers")) {
makeCustomersBinlogEvents(
statefulTaskContext.getConnection(),
binlogChangeTableId.toString(),
scanSplitsNum == 1);
} else {
makeCustomerCardsBinlogEvents(
statefulTaskContext.getConnection(), binlogChangeTableId.toString());
}

// step-5: fetched all produced binlog data and format them
List<String> actual = new ArrayList<>();
Iterator<SourceRecord> recordIterator;
while ((recordIterator = binlogReader.pollSplitRecords()) != null) {
while (recordIterator.hasNext()) {
fetchedRecords.add(recordIterator.next());
}
actual.addAll(formatResult(fetchedRecords, dataType));
fetchedRecords.clear();
if (actual.size() >= expectedSize) {
break;
}
}
return actual;
}


/**
* 为当前测试用例构建最上层的 MySqlSource 配置类
*/
private MySqlSourceConfig getConfig(String[] captureTables) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
.toArray(String[]::new);

return new MySqlSourceConfigFactory()
.databaseList(customerDatabase.getDatabaseName())
.tableList(captureTableIds)
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.username(customerDatabase.getUsername())
// snapshot 分片大小设置为 4
.splitSize(4)
// 每次取数范围为 2 条
.fetchSize(2)
.password(customerDatabase.getPassword())
.createConfig(0);
}

}

测试类中执行的 sql 脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- create table whose split key is evenly distributed
CREATE TABLE customers_even_dist (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL ,
address VARCHAR(1024),
phone_number VARCHAR(512)
);

-- splitSize 设置为 4,划分成左闭右开的区间,所以已经插入的 10 条数据会被分成 3 个 SnapshotSplit:
-- (null,105)101、102、103、104
-- [105,109) 105、106、107、108
-- [109,null) 109、110
INSERT INTO customers_even_dist
VALUES (101,'user_1','Shanghai','123567891234'),
(102,'user_2','Shanghai','123567891234'),
(103,'user_3','Shanghai','123567891234'),
(104,'user_4','Shanghai','123567891234'),
(105,'user_5','Shanghai','123567891234'),
(106,'user_6','Shanghai','123567891234'),
(107,'user_7','Shanghai','123567891234'),
(108,'user_8','Shanghai','123567891234'),
(109,'user_9','Shanghai','123567891234'),
(110,'user_10','Shanghai','123567891234');

参考资料

Percona文章- FLUSH TABLES WITH READ LOCK 命令行影响
Flink CDC 2.0实现原理剖析