Flink-FLIP-150: Introduce Hybrid Source

在实践中,尤其在需要数据回溯的场景下,许多 Flink 作业需要按顺序从多个数据源中读取数据:

  • CDC:用户可以将数据快照存储在 HDFS/S3 ,数据的更新日志存储在数据库 binlog 或 Kafka 中
  • 机器学习特征回填:当向模型中添加新特性时,需要从历史到现在的原始数据计算该特性。在大多情况下,历史数据和实时数据分别存储在两种不同的存储系统中,如 HDFS 和 Kafka

基本思路

在过去,如果要读取不同的数据源,用户必须运行两个不同的 Flink 作业:

  • 基于不同连接器的代码实现,多个源之间的切换很复杂。在切换前控制上游 source 的具体状态,以及下游 source 如何获取上游的状态转换为初始状态。
  • 目前还没有有效的机制来支持历史数据和实时数据之间平滑的迁移。平滑迁移需要定义切换的规则和时间,以确保数据的完整性和一致性。

为了平滑的支持对两种数据源的读取,Flink 作业需要先从 HDFS 读取历史数据,然后切换至 Kafka 读取实时数据,所以需要引入一个建立在 FLIP-27 之上的 混合 Source API

hybrid source 包含具体 source 列表,hybrid source 按照定义顺序读取每个 source 的数据。当 A source 读取之后,切换到下一个 B souce :

  • 当前 source 的 splitEnumerator 提供读取 end position
  • 下一个 source 支持设置 start position
  • 用户提供自定义函数:将当前 source 的 end position 转换为下一个 source 的 start position

使用示例

HybridSource 内部切换示例:

FileSource 串 KafkaSource ,Kafka 从指定位点开始消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
FileSource<String> fileSource = 
FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();

KafkaSource<String> KafkaSource =
KafkaSource.<String>builder()
.setBootstrapServers("localost:9091")
.setGroupId("MyGroup")
.setTopics(Arrays.asList("quickstart-events"))
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
.setStartingOffsets(OffsetInitializer.earliest())
.build();

HybridSource<String> hybridSource =
HybridSource.builder(fileSource)
.addSource(KafkaSource)
.build();

更复杂的示例,从前一个 source 获取 Kafka 的启动位点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
HybridSource<String> hybridSource =
HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource)
.addSource(
switchContext -> {
StaticFileSplitEnumerator previousEnumerator = switchContext.getPreviousEnumerator();

long timestamp = previousEnumerator.getEndTimestamp();

OffsetInitializer offsets = OffsetInitializer.timestamp(timestamp);
KafkaSource<String> kafkaSource =
KafkaSource.<String>builder()
.setBootstrapServers("localost:9091")
.setGroupId("MyGroup")
.setTopics(Arrays.asList("quickstart-events"))
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
.setStartingOffsets(offsets)
.build();
return kafkaSource;
},
),
Boundedness.CONTINUOUS_UNBOUNDED)
.build();

代码实现

HybridSource

HybridSource 是基于责任链设计模式实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> {
private final List<SourceListEntry> sources;

/**
* 获取 sources 列表中的最后一个 source 的有界属性,作为 HybirdSource 整体的有界属性
*/
@Override
public Boundedness getBoundedness() {
return sources.get(sources.size() - 1).boundedness;
}

@Override
public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext readerContext)
throws Exception {
return new HybridSourceReader(readerContext);
}

@Override
public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> createEnumerator(
SplitEnumeratorContext<HybridSourceSplit> enumContext) {
return new HybridSourceSplitEnumerator(enumContext, sources, 0, null);
}

...
}

其中还包括以下几个函数式接口:

  • SourceFactory :创建具体 source 的工厂类
1
2
3
public interface SourceFactory<T, SourceT extends Source<T,?,?>, FromEnumT extends SplitEnumerator> extends Serializable {
SourceT create(SourceSwitchContext<FromEnumT> context);
}
  • SourceSwitchContext
1
2
3
public interface SourceSwitchContext<EnumT> {
EnumT getPreviosEnumerator();
}
  • HybridSourceBuilder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator> {
private final List<SourceListEntry> sources;

public HybridSourceBuilder() {
sources = new ArrayList<>();
}

/**
* 直接添加 Source 到 sources 中
*/
public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T,?,?>> HybridSourceBuilder<T, ToEnumT>
addSource(NextSourceT source) {
return addSource(new PassthroughSourceFactory<>(source), source.getBoundedness());
}

/*
* 根据 SourceFactory 创建一个 Source ,再添加到 sources 中
*/
public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T,?,?>> HybridSourceBuilder<T, ToEnumT>
addSource(SourceFactory<T, NextSourceT, EnumT> sourceFactory, Boundedness boundedness) {
if(!sources.isEmpty()) {
Preconditions.checkArgument(
Boundedness.BOUNDED.equals(sources.get(sources.size() - 1).boundedness),
"All sources except the final source need to be bounded.");
}

ClosureCleaner.clean(sourceFactory, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
sources.add(SourceListEntry.of(sourceFactory, boundedness));
return (HybridSourceBuilder)this;
}

public HybridSource<T> build() {
return new HybridSource(sources);
}

}
  • HybridSourceTest

演示构建一个 HybridSource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void testBuilderWithSourceFactory() {
HybridSource.SourceFactory<Integer, Source<Integer, ?, ?>, MockSplitEnumerator>
sourceFactory =
new HybridSource.SourceFactory<
Integer, Source<Integer, ?, ?>, MockSplitEnumerator>() {
@Override
public Source<Integer, ?, ?> create(
HybridSource.SourceSwitchContext<MockSplitEnumerator> context) {

// 从 SourceSwitchContext 上下文中获取上一个 source 的 SplitEnumerator
MockSplitEnumerator enumerator = context.getPreviousEnumerator();

// 通常从此 SplitEnumerator 中获取上一个 source 读取的 end position
// 作为下一 source 的 start position
return new MockBaseSource(1, 1, Boundedness.BOUNDED);
}
};

HybridSource<Integer> source =
new HybridSource.HybridSourceBuilder<Integer, MockSplitEnumerator>()
.<MockSplitEnumerator, Source<Integer, ?, ?>>addSource(
new MockBaseSource(1, 1, Boundedness.BOUNDED))
.addSource(sourceFactory, Boundedness.BOUNDED)
.build();
assertNotNull(source);
}

HybridSourceSplitEnumerator 与 HybridSourceReader 交互流程:

HybridSourceSplitEnumerator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
public class HybridSourceSplitEnumerator
implements SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> {

private final SplitEnumeratorContext<HybridSourceSplit> context;
/**
* HybridSource 处理的 source 列表
*/
private final List<HybridSource.SourceListEntry> sources;
/**
* 已切换 source 列表,在日常开发中,最好将多个 Map 封装成一个具体的类,面向对象编程
*/
private final SwitchedSources switchedSources = new SwitchedSources();
// Splits that have been returned due to subtask reset
private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit>>> pendingSplits;
/**
* 记录跟踪已读取完成的 SourceReader 下标
*/
private final Set<Integer> finishedReaders;
private final Map<Integer, Integer> readerSourceIndex;
/*
* 当前 source 下标
*/
private int currentSourceIndex;
private HybridSourceEnumeratorState restoredEnumeratorState;
/*
* 当前 source 对应的 SplitEnumerator
*/
private SplitEnumerator<SourceSplit, Object> currentEnumerator;
private SimpleVersionedSerializer<Object> currentEnumeratorCheckpointSerializer;

@Override
public void start() {
switchEnumerator();
}

@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
LOG.debug(
"handleSourceEvent {} subtask={} pendingSplits={}",
sourceEvent,
subtaskId,
pendingSplits);
if (sourceEvent instanceof SourceReaderFinishedEvent) {

// 接收处理 HybridSourceReader 发送的 SourceReaderFinishedEvent
SourceReaderFinishedEvent srfe = (SourceReaderFinishedEvent) sourceEvent;

int subtaskSourceIndex =
readerSourceIndex.computeIfAbsent(
subtaskId,
k -> {
// first time we see reader after cold start or recovery
LOG.debug(
"New reader subtask={} sourceIndex={}",
subtaskId,
srfe.sourceIndex());
return srfe.sourceIndex();
});

if (srfe.sourceIndex() < subtaskSourceIndex) {
// duplicate event
return;
}

if (subtaskSourceIndex < currentSourceIndex) {
// source index +1
subtaskSourceIndex++;

// 向 HybridSourceReader 发送 SwitchSourceEvent 事件
sendSwitchSourceEvent(subtaskId, subtaskSourceIndex);
return;
}

// track readers that have finished processing for current enumerator
finishedReaders.add(subtaskId);
if (finishedReaders.size() == context.currentParallelism()) {
LOG.debug("All readers finished, ready to switch enumerator!");
if (currentSourceIndex + 1 < sources.size()) {
switchEnumerator();
// switch all readers prior to sending split assignments
for (int i = 0; i < context.currentParallelism(); i++) {
sendSwitchSourceEvent(i, currentSourceIndex);
}
}
}
} else {
currentEnumerator.handleSourceEvent(subtaskId, sourceEvent);
}
}

private void sendSwitchSourceEvent(int subtaskId, int sourceIndex) {
readerSourceIndex.put(subtaskId, sourceIndex);

// 定位到下一个 source
Source source = switchedSources.sourceOf(sourceIndex);
context.sendEventToSourceReader(
subtaskId,
// SwitchSourceEvent 中包括 source 下标、Source 类、是否为最后一个 source
new SwitchSourceEvent(sourceIndex, source, sourceIndex >= (sources.size() - 1)));

// send pending splits, if any
TreeMap<Integer, List<HybridSourceSplit>> splitsBySource = pendingSplits.get(subtaskId);
if (splitsBySource != null) {
List<HybridSourceSplit> splits = splitsBySource.remove(sourceIndex);
if (splits != null && !splits.isEmpty()) {
LOG.debug("Restoring splits to subtask={} {}", subtaskId, splits);
context.assignSplits(
new SplitsAssignment<>(Collections.singletonMap(subtaskId, splits)));
context.signalNoMoreSplits(subtaskId);
}
if (splitsBySource.isEmpty()) {
pendingSplits.remove(subtaskId);
}
}

if (sourceIndex == currentSourceIndex) {
LOG.debug("adding reader subtask={} sourceIndex={}", subtaskId, currentSourceIndex);
currentEnumerator.addReader(subtaskId);
}
}
}

HybridSourceReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
public class HybridSourceReader<T> implements SourceReader<T, HybridSourceSplit> {

private final SourceReaderContext readerContext;
/**
* 已经切换的 source 集合
*/
private final SwitchedSources switchedSources = new SwitchedSources();
/**
* 当前的 source 下标,启动时默认为 -1
*/
private int currentSourceIndex = -1;
/**
* 标记是否为最后一个 source
*/
private boolean isFinalSource;
/**
* 当前的 SourceReader
*/
private SourceReader<T, ? extends SourceSplit> currentReader;
private CompletableFuture<Void> availabilityFuture = new CompletableFuture<>();
private List<HybridSourceSplit> restoredSplits = new ArrayList<>();


@Override
public void start() {

int initialSourceIndex = currentSourceIndex;
if (!restoredSplits.isEmpty()) {
initialSourceIndex = restoredSplits.get(0).sourceIndex() - 1;
}
// 向 HybridSourceSplitEnumerator 发送 SourceReaderFinishedEvent
readerContext.sendSourceEventToCoordinator(
new SourceReaderFinishedEvent(initialSourceIndex));
}

@Override
public InputStatus pollNext(ReaderOutput output) throws Exception {
if (currentReader == null) {
return InputStatus.NOTHING_AVAILABLE;
}

// 使用 currentReader 继续拉取数据
InputStatus status = currentReader.pollNext(output);

if (status == InputStatus.END_OF_INPUT) {
// 当前 source 已读取完毕
LOG.info(
"End of input subtask={} sourceIndex={} {}",
readerContext.getIndexOfSubtask(),
currentSourceIndex,
currentReader);

// 向 HybridSourceSplitEnumerator 发送 SourceReaderFinishedEvent
// 准备激活下一个 source
readerContext.sendSourceEventToCoordinator(
new SourceReaderFinishedEvent(currentSourceIndex));
if (!isFinalSource) {

if (availabilityFuture.isDone()) {
// reset to avoid continued polling
availabilityFuture = new CompletableFuture();
}
return InputStatus.NOTHING_AVAILABLE;
}
}
return status;
}

@Override
public void handleSourceEvents(SourceEvent sourceEvent) {
if (sourceEvent instanceof SwitchSourceEvent) {
SwitchSourceEvent sse = (SwitchSourceEvent) sourceEvent;
LOG.info(
"Switch source event: subtask={} sourceIndex={} source={}",
readerContext.getIndexOfSubtask(),
sse.sourceIndex(),
sse.source());

// 接收 HybridSourceSplitEnumerator 发送的 SwitchSourceEvent 事件,从中获取 source 的 index 和 Source 类
// 并加入到 switchedSources
switchedSources.put(sse.sourceIndex(), sse.source());

// 设置 currentReader
setCurrentReader(sse.sourceIndex());

// 判断当前 source 是否为 sources 中的最后一个
isFinalSource = sse.isFinalSource();
if (!availabilityFuture.isDone()) {
// continue polling
availabilityFuture.complete(null);
}
} else {
// 使用 currentReader 处理其余类型的 SourceEvent
currentReader.handleSourceEvents(sourceEvent);
}
}

private void setCurrentReader(int index) {
Preconditions.checkArgument(index != currentSourceIndex);
if (currentReader != null) {
try {
currentReader.close();
} catch (Exception e) {
throw new RuntimeException("Failed to close current reader", e);
}
LOG.debug(
"Reader closed: subtask={} sourceIndex={} currentReader={}",
readerContext.getIndexOfSubtask(),
currentSourceIndex,
currentReader);
}


Source source = switchedSources.sourceOf(index);
SourceReader<T, ?> reader;
try {
// 从 HybridSourceSplitEnumerator 接收到的 Source 类,创建该 Source 的 SourceReader
reader = source.createReader(readerContext);
} catch (Exception e) {
throw new RuntimeException("Failed tp create reader", e);
}
// 启动新的 SourceReader
reader.start();
currentSourceIndex = index;
currentReader = reader;
currentReader
.isAvailable()
.whenComplete(
(result, ex) -> {
if (ex == null) {
availabilityFuture.complete(result);
} else {
availabilityFuture.completeExceptionally(ex);
}
});
LOG.debug(
"Reader started: subtask={} sourceIndex={} {}",
readerContext.getIndexOfSubtask(),
currentSourceIndex,
reader);

// 添加存储的 splits
if (!restoredSplits.isEmpty()) {
List<HybridSourceSplit> splits = new ArrayList<>(restoredSplits.size());
Iterator<HybridSourceSplit> it = restoredSplits.iterator();
while (it.hasNext()) {
HybridSourceSplit hybridSplit = it.next();
if (hybridSplit.sourceIndex() == index) {
splits.add(hybridSplit);
it.remove();
}
}
addSplits(splits);
}
}

}

参考

FLIP-150: Introduce Hybrid Source
Flink–Hybrid Source提出的动机及实现原理介绍
FLIP-150 讨论列表