Flink-FLIP-27: Refactor Source Interface

FLIP-27: Refactor Source Interface ,是对 SourceFunction 的重构,旨在解决 SourceFunction 中存在的几个痛点。SplitEnumerator 负责发现并 split,SourceReader 负责读取 split 的实际数据。也是批流一体 API 推进的产物。

重构动机

先前的 SourceFunction 存在以下几个痛点:

  • split 的发现逻辑(work discovery)和实际读取数据的逻辑耦合在 SourceFunction 和 DataStream 接口中,导致 sourcce 实现的复杂度
  • 批处理和流处理需要实现不同的 source
  • partitions/shards/splits 等概念没有在接口中显示定义,使得很难以独立于源的方式实现事件时间对齐、分区 watermark 、动态 split 分配 、work stealing 等功能
  • checkpoint 锁由 SourceFunction 占有,导致框架难以优化
  • 没有通用框架,意味着每个 source 都要实现一个复杂的线程模型,增加了新 source 实现及测试的难度

总体设计

发现读取分离

Source 端有两个主要组件:

  • SplitEnumerator: 发现并分配 split (files, partitions 等)
    SplitEnumerator 仅运行一次,非并行的,未来可以考虑并行化。通常运行在 JobManager 上,或者作为 TaskManager 上的单任务。
    在 File Source 中, SplitEnumerator 列举出所有文件;
    在 Kafka Source 中,SplitEnumerator 查询出 kafka 需要读取的所有分区;
  • Reader: 读取 splits 中的真实数据
    从分配到的 splits 中读取数据。可以一个接一个的读取有界 splits ,也可以并发读取多个 splits。

这两个组件组成了核心功能,主要的 Source 接口是一个创建 split enumerators 和 readers 的工厂:

统一批流 API

任意 source 都应该既能作为 batch source ,也能作为 streaming source 。有界性是 source 接口的内在属性,大多数情况下,仅 SplitEnumerator 需要识别有界性属性,而 SplitReaders 不需要。

1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeMode.BATCH);

通用 enumerator-reader 通信机制

SplitEnumerator 和 SourceReader 都有各自的实现类,两个组件之间需要通信,在二者之间引入了一种通用的消息传递机制。
需要在 JobMasterGateway 和 TaskExecutorGateway 中分别实现 RPC 方法。消息传递栈如下图:

SourceCoordinator 和 SourceOperator 作为上图中 OperatorCoordinator 和 Operator 针对 FLIP-27 的实现,其类图和时序图如下:

SourceEvent 是在 SplitEnumerator 和 SourceReader 之间传递消息的接口,OperatorEvent 是在 OperatorCoordinator 和 Operator 之间传递消息的接口。
在这个 FLIP 中,SourceCoordinator 将是封装 SplitEnumerator 的 OperatorCoordinator 的实现。

1
2
3
4
5
6
7
8
public interface JobMasterGateway {
...
CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(
ExecutionAttemptID task,
OperatorID operatorID,
SerializedValue<OperatorEvent> event);
...
}
1
2
3
4
5
6
7
8
9
10
11
public interface TaskExecutorGateway extends RpcGateway {

...

CompletableFuture<Acknowledge> sendOperatorEvent(
ExecutionAttemptID task,
OperatorID operator,
SerializedValue<OperatorEvent> evt);

...
}

SplitEnumerator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT>
extends AutoCloseable, CheckpointListener {


void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);


void addSplitsBack(List<SplitT> splits, int subtaskId);

/**
* 根据 subTaskId 添加一个新的 source reader
*/
void addReader(int subtaskId);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface SplitEnumeratorContext<SplitT extends SourceSplit> {

/**
* 发送 source event 到 source reader
*/
void sendEventToSourceReader(int subtaskId, SourceEvent event);

/**
* 获取已经注册的 source reader 信息
*/
Map<Integer, ReaderInfo> registeredReaders();

/**
* 分配 splits
* 根据 subTaskId 构建 gateway ,并下发 SourceSplits
*/
void assignSplits(SplitsAssignment<SplitT> newSplitAssignments);
}
1
2
3
4
5
6
7
8
public final class SplitsAssignment<SplitT extends SourceSplit> {

/**
* subTaskId, SourceSplit 列表
*/
private final Map<Integer, List<SplitT>> assignment;

}

SourceReader 抽象

首先,看下 Flink 的 Source 核心接口被设计地非常通用,但是 Reader 的实现复杂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {

/**
* 获取 source 的有界性,BOUNDED | CONTINUOUS_UNBOUNDED
*/
Boundedness getBoundedness();

/**
* 创建 reader
*/
SourceReader<T, SplitT> createReader(SourceReaderContext readerContext);

/**
* 创建 enumerator
*/
SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext);

}

因此,在 FLIP-27 中封装了一个抽象类来提供更简单的接口以允许阻塞调用,SourceReaderBase 作为 SourceReader 的一个抽象实现,基于生产者消费者模式,提供了主线程和内部读取线程之间的同步机制。用户只需专注于:

  • 自定义 SourceReader (继承 SourceReaderBase)
  • 从外部系统获取记录(实现 SplitReader 接口)
  • 发送数据到下游 (实现 RecordEmitter 接口)
  • watermark 相关处理

SourceReaderBase

SourceReaderBase 的工作流程如下图:

  1. 当 SplitEnumerator 将一个新的 split 分配给 SourceReader ,SourceReader 先为该 split 初始化 state ,再经过 SplitFetcherManager -> SplitFetcher -> SplitReader 分配到指定的 SplitReader
  2. 外部数据经过 SplitReader -> SplitFetcher.elementQueues -> SourceReaderBase.elementQueues -> RecordsWithSplitIds -> RecordEmitter ,数据批形式地入队列出队列,性能更好
  3. SourceReaderBase 遍历每一条数据,并查询数据对应的 split state ,二者会通过 RecordEmitter 发送到下游

SourceReaderBase 类的关键属性和方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
implements SourceReader<T, SplitT> {

// ==============================================================================
/**
* 生产者消费者模式的阻塞队列,存放 SplitFetcher 线程从外部系统读取的数据
*/
private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;

/**
* SplitFetcher 线程池管理
*/
protected final SplitFetcherManager<E, SplitT> splitFetcherManager;

/**
* SplitReader最新读取的records-by-split批
*/
RecordsWithSplitIds<E> currentFetch;

/**
* 当前遍历处理到的 Output
*/
private SourceOutput<T> currentSplitOutput;

/**
* 处理 SplitReaders 读取的数据
*/
protected final RecordEmitter<E, T, SplitStateT> recordEmitter;

/**
* source reader 上下文
*/
protected SourceReaderContext context;

/**
* splits 的状态后端
*/
private final Map<String, SplitContext<T, SplitStateT>> splitStates;


// ==============================================================================

/**
* 拉取下一批数据
*/
public InputStatus pollNext(ReaderOutput<T> output) {

}

/**
* 被分配到新一批 splits
*/
public void addSplits(List<SplitT> splits) {
}
}

SplitReader

大多数 readers 可以分为以下几类:

  • 顺序单 split (file,数据库查询,大多数有界 splits)
  • 多 split 多路 (Kafka,Pulsar,Pravega…)
  • 多 split 多线程 (Kinesis…)

SourceReader 实现如下接口中的方法,fetch 和 handleSplitsChanges 需要在相同线程中执行,无需在 connector 中进行任何并发处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface SplitReader<E, SplitT extends SourceSplit> {

/**
* 从外部系统读取记录,转换数据格式,写入到 blocking queue,传递给 {@link RecordEmitter}
*/
RecordsWithSplitIds<E> fetch() throws InterruptedException;

/**
* 处理 split change
*/
void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);

/*
* fetcher 线程阻塞时唤醒 split reader
*/
void wakeUp();
}

参考资料

FLIP-27: Refactor Source Interface
Flink CDC 2.0实现原理剖析