End-to-End Exactly-Once Processing in Apache Flink

2017年发版的Flink 1.4.0,引进了一个里程碑式的新特性:TwoPhaseCommitSinkFunction ,抽象了两阶段提交协议的通用逻辑,相关联的jira单,并在Kafka Producer的connector中实现了它,支持了对外部Kafka sink的exactly-once语义。在此基础上,结合 Flink、sources、sinks 和 Kafka 0.11以上的版本,可以实现端到端一致性语义的应用,需要用户实现少数方法来实现一致性语义。
在这篇博客中,介绍以下几个方面:

  1. 描述 Flink checkpoints 在保证一致性语义中的角色作用
  2. 展示Flink如何通过两阶段提交协议与sources和sinks交互,以提供端到端的一致性语义
  3. 通过一个简单的示例,通过使用 TwoPhaseCommitSinkFunction ,实现落地到文件目的端的一致性语义

Flink中的exactly-once语义

Exactly-once语义简称EOS,指的是每条输入消息只会影响最终结果一次,注意这里是影响一次,而非处理一次。保证即使在机器或应用发生故障的情况下,也不会有重复数据或者漏处理的数据。
Flink应用内部的 exactly-once 是通过 Flink 的 checkpointing算法实现的,Flink 做 checkpoint 的一致性快照中的内容有:

  • 应用的当前state
  • 输入流的位置

Flink以可配置的间隔生成 checkpoints ,并持久化到存储系统,如 S3、HDFS 等。这个持久化过程是异步的,也就是说,Flink 在 checkpointing 的过程中会继续处理数据。当机器或应用发生故障后重启,Flink 会从最近一个成功完成的 checkpoint 恢复,恢复应用的state,回滚到输入流的消费位置,就像故障从来没发生过一样。

在 Flink 1.4.0 之前,exactly-once 语义仅限于 Flink 应用内部,不涉及到外部系统。为了实现端到端的 exactly-once 语义,外部系统必须与Flink checkpoint 协调处理提供一种能提交与回滚的写入方式。常见的协调提交与回滚操作的途径就是两阶段提交协议。

Flink中的端到端的exactly-once应用

Flink一直宣称自己支持EOS,实际上主要是针对Flink应用内部来说的,对于外部系统(端到端)则有比较强的限制:

  • 外部系统写入支持幂等性
  • 外部系统支持以事务的方式写入

Kafka的幂等性和事务

Kafka 在0.11版本之前只能保证 at-least-once 和 at-most-once 语义,从0.11版本开始,引入了幂等发送和事务,从而实现 exactly-once 语义。

幂等性

未引入幂等性之前,Kafka正常发送和重试发送消息的流程图如下:
avatar

未引入幂等性之前,Kafka重试发送消息的流程图如下,可能会造成数据重复:
avatar

为了实现Producer的幂等语义,Kafka引入了Producer ID(即PID)和Sequence Number。每个新的Producer在初始化的时候会被分配一个唯一的PID,该PID对用户完全透明而不会暴露给用户。
Producer发送每条消息<Topic, Partition>对于Sequence Number会从0开始单调递增,broker端会为每个<PID, Topic, Partition>维护一个序号,每次commit一条消息此序号加一,对于接收的每条消息,如果其序号比Broker维护的序号(即最后一次Commit的消息的序号)大1以上,则Broker会接受它,否则将其丢弃:

序号比Broker维护的序号大1以上,说明存在乱序。
序号比Broker维护的序号小,说明此消息以及被保存,为重复数据。

有了幂等性,Kafka正常发送和重试发送消息流程图如下:
avatar

avatar

幂等性机制仅解决了单分区上的数据重复和乱序问题,对于跨session和所有分区的重复和乱序问题不能得到解决。于是需要引入事务。

事务

// 待补充

端到端的exactly-once实现过程

Flink 支持的端到端的 exactly-once 不仅限于 Kafka,可以应用到其他的source/sink,只要它们提供了必需的协调机制。
以下示例中:

  • 从Kafka读取数据的opertor(KafkaConsumer)
  • 窗口聚合操作
  • 写入到Kafka目的源的operator(KafkaProducer),两个checkpoints之间的待写入数据集合,必须是事务操作,当发生故障时要能回滚。

avatar

预提交阶段

一次 checkpoint 的开始代表两阶段提交协议中的预提交阶段。Flink JobManager 向数据流中注入一个 checkpoint barrier ,分隔两次 checkpoints 之间的数据集合。barrier 从一个 operator 流向下一个 operator,每个 operator 接收到 barrier 之后就会触发 state backend 操作,做 state 快照。

avatar

Data Source存储它消费到Kafka位置,完成之后向下一个 operator 传递 barrier 。内部状态是指所有能通过 Flink state 存储和管理的状态。例如,窗口求和的状态。处理过程中如果只有内部状态,那么在与提交阶段就只需要持久化state,不需要额外的操作,因为Flink 会接管state的持久化操作,state 成功写入就进行 commit 操作,发生故障时会丢弃这些 state。

avatar

然而,如果处理过程中涉及到外部状态,外部状态来自于写入外部系统(如Kafka)时。为了保证 exactly-once ,外部系统必须支持整合二阶段提交协议的事务操作。在预提交阶段,data sink除了存储预提交的外部状态,还要进行事务的预提交操作。

avatar

当checkpoint barrier传递到所有的operators之后,代表预提交阶段完成。所有触发的state快照被认为是checkpoint的一部分,checkpoint 是整个应用的state快照,包括预提交的外部状态。发生故障时,就可以从上次的完整快照恢复。下一步就进入提交阶段了,JobManager通知所有的operator checkpoint已成功完成,处理每个 operator 的checkpoint完成的回调。data source 和 window operator 没有外部状态,因此在提交阶段不需要做任何操作,data sink有外部状态,外部系统的事务操作需要进行真正的提交操作了。

avatar

让我们总结一下上面的过程:

  • 一旦所有的 operators 完成了预提交,就进行提交操作
  • 如果其中一个operator的预提交操作失败,其他的预提交操作就会终止,并回滚到上一次成功执行的checkpoint中记录的状态
  • 预提交成功之后,提交操作必须保证成功,所有的 operators 和 外部系统都要参与其中。如果提交失败(例如由于偶发性的网络问题),整个Flink应用就会失败,应用会根据用户设置的重启策略进行重启,重试提交操作。这个过程是很关键的,因为如果提交操作最终没有成功,将会导致数据丢失。

因此,我们可以确保所有的operators的对最终的checkpoint 结果都是一致的:数据不是被提交了,就是提交被终止并回滚了。

TwoPhaseCommitSinkFunction

使用idea自带的Diagrams插件,查看TwoPhaseCommitSinkFunction类的继承关系:
avatar

几个重要的方法

initializeState(FunctionSnapshotContext context)

初始化状态:直接开启一个新事务;
从已有的状态中恢复:
读取 state 中记录的待提交事务 pendingCommitTransactions 列表,执行状态恢复和事务commit操作,最后清空 pendingCommitTransactions 列表;
读取 state 中记录的 pending 状态的事务列表,执行事务abort操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

state = context.getOperatorStateStore().getListState(stateDescriptor);

boolean recoveredUserContext = false;
if (context.isRestored()) {
// 开始读取snapshot中的state,进行状态恢复操作
LOG.info("{} - restoring state", name());
for (State<TXN, CONTEXT> operatorState : state.get()) {
userContext = operatorState.getContext();
// 获取从state中读取的待提交的事务列表
List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions();
for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {
// If this fails to succeed eventually, there is actually data loss
// 恢复状态的同时会执行提交操作
recoverAndCommitInternal(recoveredTransaction);
LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction);
}

recoverAndAbort(operatorState.getPendingTransaction().handle);
LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction());

if (userContext.isPresent()) {
finishRecoveringContext();
recoveredUserContext = true;
}
}
}

// if in restore we didn't get any userContext or we are initializing from scratch
if (!recoveredUserContext) {
LOG.info("{} - no state to restore", name());

userContext = initializeUserContext();
}
// 清除 pendingCommitTransactions 集合
this.pendingCommitTransactions.clear();

// 开启事务
currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
}

snapshotState(FunctionSnapshotContext context)

两阶段提交 Function 做 state 快照,从 context 中取出当前的 checkpointId;
接着执行预提交操作,并把待提交的事务加到 pendingCommitTransactions 列表中;
开启一个新的事务;
存储新的state。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// this is like the pre-commit of a 2-phase-commit transaction
// we are ready to commit and remember the transaction

checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");

// 从上下文中取得当前的checkpointId,是从0开始递增
long checkpointId = context.getCheckpointId();
LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);

// 执行预提交操作,传入事务操作类,进行事务的预提交操作
preCommit(currentTransactionHolder.handle);
// 执行过预提交操作的事务放入pendingCommitTransactions集合,
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);

currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);

// 清除上一次的state
state.clear();
// 存储新的状态
state.add(new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}

notifyCheckpointComplete(long checkpointId)

两阶段提交Function执行notify操作,遍历待提交事务列表,执行commit操作;
执行完 commit 操作的事务,从 pendingCommitTransactions 中移除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
// the following scenarios are possible here
//
// (1) there is exactly one transaction from the latest checkpoint that
// was triggered and completed. That should be the common case.
// Simply commit that transaction in that case.
//
// (2) there are multiple pending transactions because one previous
// checkpoint was skipped. That is a rare case, but can happen
// for example when:
//
// - the master cannot persist the metadata of the last
// checkpoint (temporary outage in the storage system) but
// could persist a successive checkpoint (the one notified here)
//
// - other tasks could not persist their status during
// the previous checkpoint, but did not trigger a failure because they
// could hold onto their state and could successfully persist it in
// a successive checkpoint (the one notified here)
//
// In both cases, the prior checkpoint never reach a committed state, but
// this checkpoint is always expected to subsume the prior one and cover all
// changes since the last successful one. As a consequence, we need to commit
// all pending transactions.
//
// (3) Multiple transactions are pending, but the checkpoint complete notification
// relates not to the latest. That is possible, because notification messages
// can be delayed (in an extreme case till arrive after a succeeding checkpoint
// was triggered) and because there can be concurrent overlapping checkpoints
// (a new one is started before the previous fully finished).
//
// ==> There should never be a case where we have no pending transaction here
//

Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
Throwable firstError = null;

while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
// 如果待提交的事务的 checkpointId 大于当前通知的已完成的checkpointId,则循环继续下一个
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}

LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);

// 如果事务执行时间相对较长的,可以打印出警告日志
logWarningIfTimeoutAlmostReached(pendingTransaction);
try {
// 执行commit操作
commit(pendingTransaction.handle);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
}
}

LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);

pendingTransactionIterator.remove();
}

if (firstError != null) {
throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
firstError);
}
}

需要子类实现的方法

  • invoke:接收到数据,处理数据
  • beginTransaction:开启事物,做一些准备工作
  • preCommit:预提交阶段,进行预提交操作
  • commit:提交阶段,进行真正的提交操作
  • abort:回滚操作,撤回预提交阶段的操作

实现file sink exactly-once的示例

实现逻辑

要实现上面的过程会有点复杂,因此Flink抽象出一个TwoPhaseCommitSinkFunction类。
假设写入的目的端是磁盘文件系统 file sink,我们只需要实现以下4个方法就可以实现file sink 的 exactly-once 语义:

  • invoke:接收到数据,将数据写入临时文件
  • beginTransaction:开启事物,在目标文件系统的临时目录创建临时文件,创建写文件的writer,后续可以向临时文件中写入数据
  • preCommit:预提交阶段,刷写并关闭文件,永远不会再向这个文件写入了。并为下一次checkpoint启用新的事务
  • commit:提交阶段,将临时文件从临时目录移动到正式目录,显然这会让目的端数据延时可见
  • abort:删除临时文件

当Flink任务发生故障时,所有的 operators 会恢复成最近一次成功生成的 checkpoint 中的状态。对于预提交的事务,我们必须存储足够的状态信息,以便在重启的时候决定提交还是回滚事务。在本例子中,存储的应该是临时文件和真正的存储目录。TwoPhaseCommitSinkFunction 考虑到了这个场景,从checkpoint中恢复状态时总是进行提交操作。我们应该确保写入目的端的提交操作是幂等的,通常情况下,这也不是一个必须要考虑的问题。在本例子中,临时文件不在临时目录中,就是已经被移动到真正的存储目录中了。

可以查看 Flink源码中自带的两阶段提交测试示例
自定义实现两阶段提交的function ContentDumpSinkFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* 实现两阶段提交的function
*/
private class ContentDumpSinkFunction extends TwoPhaseCommitSinkFunction<String, ContentTransaction, Void> {

public ContentDumpSinkFunction() {
super(
new KryoSerializer<>(ContentTransaction.class, new ExecutionConfig()),
VoidSerializer.INSTANCE, clock);
}

/**
* 接收到事件,invoke处理事件
*/
@Override
protected void invoke(ContentTransaction transaction, String value, Context context) throws Exception {
transaction.tmpContentWriter.write(value);
}

/**
* 开启事务
*/
@Override
protected ContentTransaction beginTransaction() throws Exception {
// 开启事务,创建写文件的writer,文件名为随机生成的UUID
return new ContentTransaction(tmpDirectory.createWriter(UUID.randomUUID().toString()));
}

/**
* 预提交阶段
*/
@Override
protected void preCommit(ContentTransaction transaction) throws Exception {
// 刷写到临时文件并关闭文件,因为再也不会向这个临时文件中写入数据
transaction.tmpContentWriter.flush();
transaction.tmpContentWriter.close();
}

/**
* 提交阶段
*/
@Override
protected void commit(ContentTransaction transaction) {
if (throwException.get()) {
throw new RuntimeException("Expected exception");
}

// 转移tmp目录下的文件到target目录下
ContentDump.move(
transaction.tmpContentWriter.getName(),
tmpDirectory,
targetDirectory);

}

/**
* abort,进行回滚操作,把预提交阶段刷写的临时文件删除
*/
@Override
protected void abort(ContentTransaction transaction) {
transaction.tmpContentWriter.close();
tmpDirectory.delete(transaction.tmpContentWriter.getName());
}
}

定义事务操作类 ContentTransaction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 事务操作类
*/
private static class ContentTransaction {
private ContentDump.ContentWriter tmpContentWriter;

public ContentTransaction(ContentDump.ContentWriter tmpContentWriter) {
this.tmpContentWriter = tmpContentWriter;
}

@Override
public String toString() {
return String.format("ContentTransaction[%s]", tmpContentWriter.getName());
}
}

模拟内存文件读写操作的应用类 ContentDump

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/**
* Utility class to simulate in memory file like writes, flushes and closing.
*/
public class ContentDump {
private boolean writable = true;
/**
* filename,List<String>
*/
private Map<String, List<String>> filesContent = new HashMap<>();

public Set<String> listFiles() {
return new HashSet<>(filesContent.keySet());
}

public void setWritable(boolean writable) {
this.writable = writable;
}

/**
* Creates an empty file.
*/
public ContentWriter createWriter(String name) {
checkArgument(!filesContent.containsKey(name), "File [%s] already exists", name);
filesContent.put(name, new ArrayList<>());
return new ContentWriter(name, this);
}

public static void move(String name, ContentDump source, ContentDump target) {
Collection<String> content = source.read(name);
try (ContentWriter contentWriter = target.createWriter(name)) {
contentWriter.write(content).flush();
}
source.delete(name);
}

public void delete(String name) {
filesContent.remove(name);
}

public Collection<String> read(String name) {
List<String> content = filesContent.get(name);
checkState(content != null, "Unknown file [%s]", name);
List<String> result = new ArrayList<>(content);
return result;
}

private void putContent(String name, List<String> values) {
List<String> content = filesContent.get(name);
checkState(content != null, "Unknown file [%s]", name);
if (!writable) {
throw new NotWritableException(name);
}
content.addAll(values);
}

/**
* {@link ContentWriter} represents an abstraction that allows to putContent to the {@link ContentDump}.
*/
public static class ContentWriter implements AutoCloseable {
private final ContentDump contentDump;
private final String name;
private final List<String> buffer = new ArrayList<>();
private boolean closed = false;

private ContentWriter(String name, ContentDump contentDump) {
this.name = checkNotNull(name);
this.contentDump = checkNotNull(contentDump);
}

public String getName() {
return name;
}

public ContentWriter write(String value) {
checkState(!closed);
buffer.add(value);
return this;
}

public ContentWriter write(Collection<String> values) {
values.forEach(this::write);
return this;
}

public ContentWriter flush() {
contentDump.putContent(name, buffer);
return this;
}

public void close() {
buffer.clear();
closed = true;
}
}

/**
* Exception thrown for an attempt to write into read-only {@link ContentDump}.
*/
public class NotWritableException extends RuntimeException {
public NotWritableException(String name) {
super(String.format("File [%s] is not writable", name));
}
}
}

测试

场景1:测试完整的两阶段提交流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 发送element "42" - 0
* 发送snapshot 0 - 1
* 发送element "43" - 2
* 发送snapshot 1 - 3
* 发送element "44" - 4
* 发送snapshot 2 - 5
* <p>
* 测试场景:
* 测试完整的两阶段提交流程,通知checkpointId-1完成,那写入到target目录的数据只有"42","43"
* 此时,在tmp目录下状态数据应该有一个checkpointId-2和数据"44"
*/
@Test
public void testNotifyOfCompletedCheckpoint() throws Exception {
// 准备工作:initializeEmptyState、initializeState()方法中会进行开启事务操作、打开userFunction、初始化SimpleContext
// 开启事务操作,会创建一个临时文件1
harness.open();
// 向临时文件1中写入数据42
harness.processElement("42", 0);
// 给checkpointId-0做snapshot,执行预提交,刷写并关闭临时文件1;开启事务操作,创建一个临时文件2
harness.snapshot(0, 1);
// 向临时文件2中写入数据43
harness.processElement("43", 2);
// 给checkpointId-1做snapshot,执行预提交,刷写并关闭临时文件2;开启事务操作,创建一个临时文件3
harness.snapshot(1, 3);
// 向临时文件3中写入数据44
harness.processElement("44", 4);
// 给checkpointId-2做snapshot,执行预提交,刷写并关闭临时文件3;开启事务操作,创建一个临时文件4
harness.snapshot(2, 5);
// notify checkpointId-1 完成,才会执行提交操作,把checkpointId-1之前的临时文件都移动正式目录
harness.notifyOfCompletedCheckpoint(1);

for (String name : targetDirectory.listFiles()) {
System.out.println("File " + name + " in target directory. content " + targetDirectory.read(name));
}

for (String name : tmpDirectory.listFiles()) {
System.out.println("File " + name + " in tmp directory. content " + tmpDirectory.read(name));
}

assertExactlyOnce(Arrays.asList("42", "43"));
// one for checkpointId 2 and second for the currentTransaction
assertEquals(2, tmpDirectory.listFiles().size());
}

运行结果:
通知checkpointId-1完成,此时会执行提交操作。在checkpointId-1 之前处理的数据只有42、43,前两次做snapshot产生了两个临时文件,提交阶段会将这两个临时文件移动到目的目录。
initializeState() 方法中会执行 beginTransactionInternal(),创建一个临时文件。
接着在做snapshot的过程中 snapshotState() 方法中会先执行 preCommit() 预提交操作,关闭之前的临时文件;再执行 beginTransactionInternal(),创建一个临时文件。

1
2
3
4
5
File 5562e15f-7c27-447c-80e9-b1d773141bec in target directory. content [42]
File e7df0b52-e27a-45c1-b804-26d970d87912 in target directory. content [43]

File 12583208-4bef-412d-95d8-76f8bf0a0a75 in tmp directory. content []
File 6bfd6970-d58e-4aea-979d-dd7dd37d77ca in tmp directory. content [44]

场景2:测试通知checkpoint完成之前出现故障

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 测试场景:
* 没有执行notifyOfCompletedCheckpoint之前程序挂了,恢复时从checkpointId-1恢复
* 应该保证target目录中的数据只有"42","43"
* 临时文件在close时被清除,无状态数据
*/
@Test
public void testFailBeforeNotify() throws Exception {
harness.open();
harness.processElement("42", 0);
harness.snapshot(0, 1);
harness.processElement("43", 2);
OperatorSubtaskState snapshot = harness.snapshot(1, 3);

// 模拟在执行notifyOfCompletedCheckpoint之前出故障的场景
// 设置临时目录不可写入,为了制造不可写入的异常,以方便捕获它
tmpDirectory.setWritable(false);
try {
// 执行到这一步因不可写入直接走到catch
harness.processElement("44", 4);
harness.snapshot(2, 5);
fail("something should fail");
} catch (Exception ex) {
if (!(ex.getCause() instanceof ContentDump.NotWritableException)) {
throw ex;
}
// ignore
}
// 模拟发生异常之后退出流处理流程
closeTestHarness();

// 设置临时目录可写
tmpDirectory.setWritable(true);

// 从快照checkpointId-1恢复state,恢复的同时会进行提交操作,会把checkpointId-1之前的临时文件移动到目的目录中
setUpTestHarness();
harness.initializeState(snapshot);

// close操作最终会调用TwoPhaseCommitSinkFunction中的close方法,会调用事务的abort()方法,abort()方法中会清除临时文件
closeTestHarness();

for (String name : targetDirectory.listFiles()) {
System.out.println("File " + name + " in target directory. content " + targetDirectory.read(name));
}

if (tmpDirectory.listFiles().isEmpty()) {
System.out.println("There is no file in tmp directory.");
}

for (String name : tmpDirectory.listFiles()) {
System.out.println("File " + name + " in tmp directory. content " + tmpDirectory.read(name));
}

assertExactlyOnce(Arrays.asList("42", "43"));
assertEquals(0, tmpDirectory.listFiles().size());
}

运行结果:
重启之后,从checkpointId-1记录的snapshot恢复,initializeSize会读取状态并进行commit操作,42、43被成功提交了。
关闭会执行到AbstractUdfStreamOperator.close(),进而执行到 TwoPhaseCommitSinkFunction.close() -> ContentDumpSinkFunction.abort(),abort()方法中会清除临时文件。

1
2
3
4
File e3154038-939d-4b07-bde2-a8a399a3a648 in target directory. content [43]
File bbfca9d4-ea4a-46d8-9c91-79c0938875a3 in target directory. content [42]

There is no file in tmp directory.

场景3:忽略commit阶段超时提交的异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 测试场景:
* 当执行commit操作时,如果设置了忽略提交超时异常,并且也确定是超时了,则不会向外抛出异常了
*/
@Test
public void testIgnoreCommitExceptionDuringRecovery() throws Exception {
clock.setEpochMilli(0);

harness.open();
harness.processElement("42", 0);

final OperatorSubtaskState snapshot = harness.snapshot(0, 1);
// 此时,42数据已经被写入目标目录,checkpointId-1 大于当前的0,可以执行到提交操作
harness.notifyOfCompletedCheckpoint(1);

for (String name : targetDirectory.listFiles()) {
System.out.println("File " + name + " in target directory. content " + targetDirectory.read(name));
}

// 人为设置提交时抛出异常
throwException.set(true);

closeTestHarness();
setUpTestHarness();

final long transactionTimeout = 1000;
// 设置事务超时时长
sinkFunction.setTransactionTimeout(transactionTimeout);
// 事务一直提交失败,已经超时了,设置跳过
sinkFunction.ignoreFailuresAfterTransactionTimeout();

try {
// 从snapshot恢复state,从checkpointId-0处恢复,会执行commit操作,此时commit操作中就会抛出异常
harness.initializeState(snapshot);
fail("Expected exception not thrown");
} catch (RuntimeException e) {
assertEquals("Expected exception", e.getMessage());
}

clock.setEpochMilli(transactionTimeout + 1);
// 从snapshot恢复state,从checkpointId-0处恢复,会执行commit操作,此时commit操作中就会抛出异常
// 如果设置了忽略事务提交超时异常,并且确实发生了超时异常,则忽略此异常,外层就不用捕获了
harness.initializeState(snapshot);

assertExactlyOnce(Collections.singletonList("42"));
}

运行结果:

1
File cd349e67-b6aa-4d04-8150-79e4f80f7058 in target directory. content [42]

这个结果在前面的通知阶段就已经生成了。重启服务进行状态初始化,都会进行commit操作,这个操作可能一直失败直到超时,有时我们想放程序执行而不抛出异常,就可以通过设置以下两个属性:
sinkFunction.setTransactionTimeout(transactionTimeout);
sinkFunction.ignoreFailuresAfterTransactionTimeout();