Flink源码剖析-flink-table-runtime-blink_TopN

本文将基于 flink release-1.11 源码,简单分析下 TopN function 的实现。

AbstractTopNFunction

AbstractTopNFunction 中有如下属性,定义 sortKey selector 和 comparator,rankEnd 相关参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

// we set default topN size to 100
private static final long DEFAULT_TOPN_SIZE = 100;

// The util to compare two sortKey equals to each other.
/**
* 生成 sortKey 比较器实例类的工具类
*/
private GeneratedRecordComparator generatedSortKeyComparator;
/**
* sortKey 比较器
*/
protected Comparator<RowData> sortKeyComparator;

private final boolean generateUpdateBefore;
/**
* 是否输出排序序号
*/
protected final boolean outputRankNumber;
/**
* 输入的数据类型
*/
protected final RowDataTypeInfo inputRowType;
/**
* key selector,选择 RowData 中的哪一个字段来排序
*/
protected final KeySelector<RowData, RowData> sortKeySelector;

/**
* key 上下文,获取当前处理数据的 key
*/
protected KeyContext keyContext;
/**
* 是否是固定的 TopN 集合大小
*/
private final boolean isConstantRankEnd;
/**
* rankStart 值
*/
private final long rankStart;
/**
* rankEnd 在 RowData 中的下标
*/
private final int rankEndIndex;
/**
* rankEnd 值
*/
protected long rankEnd;
/**
* java.util.Function,从 RowData 的某一个位置获取 rankEnd
*/
private transient Function<RowData, Long> rankEndFetcher;

/**
* 记录 rankEnd,可能随着输入数据动态变化
*/
private ValueState<Long> rankEndState;
private Counter invalidCounter;
/**
* 当 TopN 需要输出排位序号时,会用到这个对象
*/
private JoinedRowData outputRow;

// metrics
protected long hitCount = 0L;
protected long requestCount = 0L;

AbstractTopNFunction 的 open() 方法主要从状态后端获取 rankEndState,并初始化类属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
initCleanupTimeState("RankFunctionCleanupTime");
outputRow = new JoinedRowData();

if (!isConstantRankEnd) {
// 从状态后端读取当前 rankEnd 值
ValueStateDescriptor<Long> rankStateDesc = new ValueStateDescriptor<>("rankEnd", Types.LONG);
rankEndState = getRuntimeContext().getState(rankStateDesc);
}
// compile comparator
// classLoader 加载 key comparator 类
sortKeyComparator = generatedSortKeyComparator.newInstance(getRuntimeContext().getUserCodeClassLoader());
// 把确定不需要的对象直接赋值为 null
generatedSortKeyComparator = null;
invalidCounter = getRuntimeContext().getMetricGroup().counter("topn.invalidTopSize");

// initialize rankEndFetcher
if (!isConstantRankEnd) {
LogicalType rankEndIdxType = inputRowType.getLogicalTypes()[rankEndIndex];
switch (rankEndIdxType.getTypeRoot()) {
case BIGINT:
rankEndFetcher = (RowData row) -> row.getLong(rankEndIndex);
break;
case INTEGER:
rankEndFetcher = (RowData row) -> (long) row.getInt(rankEndIndex);
break;
case SMALLINT:
rankEndFetcher = (RowData row) -> (long) row.getShort(rankEndIndex);
break;
default:
LOG.error("variable rank index column must be long, short or int type, while input type is {}",
rankEndIdxType.getClass().getName());
throw new UnsupportedOperationException(
"variable rank index column must be long type, while input type is " +
rankEndIdxType.getClass().getName());
}
}
}

AbstractTopNFunction 的 initRankEnd() 方法根据 input row 来动态获取 rankEnd :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

/**
* Initialize rank end.
*
* @param row input record
* @return rank end
* @throws Exception
*/
protected long initRankEnd(RowData row) throws Exception {
if (isConstantRankEnd) {
return rankEnd;
} else {
Long rankEndValue = rankEndState.value();
long curRankEnd = rankEndFetcher.apply(row);
if (rankEndValue == null) {
rankEnd = curRankEnd;
// 同步更新到状态后端
rankEndState.update(rankEnd);
return rankEnd;
} else {
rankEnd = rankEndValue;
if (rankEnd != curRankEnd) {
// increment the invalid counter when the current rank end not equal to previous rank end
invalidCounter.inc();
}
return rankEnd;
}
}
}

AbstractTopNFunction 的 checkSortKeyInBufferRange() 方法来判断 input row 是否应该被放到其 key 对应的 TopBuffer 中:

  1. 将 input row 与 TopBuffer 中的最后一个 entry 比较,comparator 返回 true 则将 input row 丢到 TopBuffer 中;
  2. comparator 返回 false,当前 TopBuffer 中的 entry 个数还没有达到默认的 TopN size,也将 input row 丢到 TopBuffer 中。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    /**
    * Checks whether the record should be put into the buffer.
    *
    * @param sortKey sortKey to test
    * @param buffer buffer to add
    * @return true if the record should be put into the buffer.
    */
    protected boolean checkSortKeyInBufferRange(RowData sortKey, TopNBuffer buffer) {
    Comparator<RowData> comparator = buffer.getSortKeyComparator();
    Map.Entry<RowData, Collection<RowData>> worstEntry = buffer.lastEntry();
    if (worstEntry == null) {
    // return true if the buffer is empty. TopNBuffer 是空的,直接返回 true
    return true;
    } else {
    RowData worstKey = worstEntry.getKey();
    //执行 TopN 比较器
    int compare = comparator.compare(sortKey, worstKey);
    if (compare < 0) {
    // 如果满足条件,可以放到 TopNBuffer 中
    return true;
    } else {
    // 到达的数据条数还没有达到默认的 TopN 大小 100,也可以放到 TopNBuffer 中
    return buffer.getCurrentTopNum() < getDefaultTopNSize();
    }
    }
    }

AbstractTopNFunction 的 createOutputRow() 方法用于构建 output row,区分带不带 rank 序号:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  /**
* 构建 output row
*
* @param inputRow input row
* @param rank 排位序号
* @param rowKind 描述一行 changelog 的行为种类
* @return {@link RowData}
*/
private RowData createOutputRow(RowData inputRow, long rank, RowKind rowKind) {
if (outputRankNumber) {
// 需要输出 rank number
GenericRowData rankRow = new GenericRowData(1);
// 第 0 个字段设置为排位序号,将 rank 专门放置在一个 RowData 中
rankRow.setField(0, rank);

outputRow.replace(inputRow, rankRow);
outputRow.setRowKind(rowKind);
return outputRow;
} else {
inputRow.setRowKind(rowKind);
return inputRow;
}
}

AppendOnlyTopNFunction

AppendOnlyTopNFunction 中有如下属性,状态后端 MapState 和本地堆内存 TopNBuffer 结合使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
   /**
* sortKey 字段类型
*/
private final RowDataTypeInfo sortKeyType;
/**
* input row 的序列化类
*/
private final TypeSerializer<RowData> inputRowSer;
private final long cacheSize;

// a map state stores mapping from sort key to records list which is in topN
/**
* sortKey <-> 在 TopN 中的 RowData list
*/
private transient MapState<RowData, List<RowData>> dataState;

// the buffer stores mapping from sort key to records list, a heap mirror to dataState
/**
* 当前 sortKey 对应的 TopNBuffer
*/
private transient TopNBuffer buffer;

// the kvSortedMap stores mapping from partition key to it's buffer
/**
* sortKey <-> TopNBuffer
*/
private transient Map<RowData, TopNBuffer> kvSortedMap;

AppendOnlyTopNFunction 的 open() 方法中从状态后端中获取当前 key 的 TopN list:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// LRU的缓存大小=总的缓存大小/topN的缓存大小
int lruCacheSize = Math.max(1, (int) (cacheSize / getDefaultTopNSize()));
// 根据 key 缓存 LRU list
kvSortedMap = new LRUMap<>(lruCacheSize);
LOG.info("Top{} operator is using LRU caches key-size: {}", getDefaultTopNSize(), lruCacheSize);

// 根据 key 记录当前的 TopN list
// RowDataTypeInfo
ListTypeInfo<RowData> valueTypeInfo = new ListTypeInfo<>(inputRowType);
MapStateDescriptor<RowData, List<RowData>> mapStateDescriptor = new MapStateDescriptor<>(
"data-state-with-append", sortKeyType, valueTypeInfo);
dataState = getRuntimeContext().getMapState(mapStateDescriptor);

// metrics
registerMetric(kvSortedMap.size() * getDefaultTopNSize());
}

AppendOnlyTopNFunction 的 processElement() 方法处理数据,判断当前 input row 是否可以丢到 TopNBuffer 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public void processElement(RowData input, Context context, Collector<RowData> out) throws Exception {
// 获取当前时间,记录在上下文的计时器中
long currentTime = context.timerService().currentProcessingTime();
// register state-cleanup timer
registerProcessingCleanupTimer(context, currentTime);

initHeapStates();
initRankEnd(input);

// 从输入的数据中抽取 sortKey
RowData sortKey = sortKeySelector.getKey(input);
// check whether the sortKey is in the topN range
// 根据 sortKey 判断当前数据是否应该被放到 TopNBuffer 中
if (checkSortKeyInBufferRange(sortKey, buffer)) {
// insert sort key into buffer
buffer.put(sortKey, inputRowSer.copy(input));
Collection<RowData> inputs = buffer.get(sortKey);
// update data state
// copy a new collection to avoid mutating state values, see CopyOnWriteStateMap,
// otherwise, the result might be corrupt.
// don't need to perform a deep copy, because RowData elements will not be updated
// 同步记录到 MapState 中
dataState.put(sortKey, new ArrayList<>(inputs));
if (outputRankNumber || hasOffset()) {
// the without-number-algorithm can't handle topN with offset,
// so use the with-number-algorithm to handle offset
processElementWithRowNumber(sortKey, input, out);
} else {
processElementWithoutRowNumber(input, out);
}
}
}

AppendOnlyTopNFunction 的 initHeapStates() 是在处理 input row 之前,在堆内存中初始化 TopNBuffer,并将状态后端存储的 TopN list 设置到 TopNBuffer 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void initHeapStates() throws Exception {
requestCount += 1;
// 从 KeyContext 中获取当前的key
RowData currentKey = (RowData) keyContext.getCurrentKey();
// 取出 key 对应的 TopNBuffer
buffer = kvSortedMap.get(currentKey);
if (buffer == null) {
// buffer 为 null,则为此 key 构建 TopNBuffer,为其设置 key comparator
buffer = new TopNBuffer(sortKeyComparator, ArrayList::new);
kvSortedMap.put(currentKey, buffer);
// restore buffer
// 读取 state 中记录的 TopN list,塞到这个 TopNBuffer 里
Iterator<Map.Entry<RowData, List<RowData>>> iter = dataState.iterator();
if (iter != null) {
while (iter.hasNext()) {
Map.Entry<RowData, List<RowData>> entry = iter.next();
RowData sortKey = entry.getKey();
List<RowData> values = entry.getValue();
// the order is preserved
buffer.putAll(sortKey, values);
}
}
} else {
// buffer 不为 null,记录命中一次 TopNBuffer 缓存
hitCount += 1;
}
}

AppendOnlyTopNFunction 的 processElementWithoutRowNumber() 方法是处理丢到 TopNBuffer 中的 input row,决定这条数据是否被 Delete :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void processElementWithoutRowNumber(RowData input, Collector<RowData> out) throws Exception {
// remove retired element
// 当前 TopNBuffer 中缓存的数据条数大于 TopN 的 N
if (buffer.getCurrentTopNum() > rankEnd) {
Map.Entry<RowData, Collection<RowData>> lastEntry = buffer.lastEntry();
RowData lastKey = lastEntry.getKey();
Collection<RowData> lastList = lastEntry.getValue();
RowData lastElement = buffer.lastElement();
int size = lastList.size();
// remove last one
if (size <= 1) {
// 移除最后一个元素
buffer.removeAll(lastKey);
dataState.remove(lastKey);
} else {
// 移除大于 TopN 的 N 之后的元素
buffer.removeLast();
// last element has been removed from lastList, we have to copy a new collection
// for lastList to avoid mutating state values, see CopyOnWriteStateMap,
// otherwise, the result might be corrupt.
// don't need to perform a deep copy, because RowData elements will not be updated
// 更新状态后端
dataState.put(lastKey, new ArrayList<>(lastList));
}
if (size == 0 || input.equals(lastElement)) {
// input 的数据和 TopNBuffer 中的最后一个元素相同,则直接返回
return;
} else {
// lastElement shouldn't be null
collectDelete(out, lastElement);
}
}
// it first appears in the TopN, send INSERT message
collectInsert(out, input);
}

AppendOnlyTopNFunctionTest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* Tests for {@link AppendOnlyTopNFunction}.
*/
public class AppendOnlyTopNFunctionTest extends TopNFunctionTestBase {

@Override
protected AbstractTopNFunction createFunction(RankType rankType, RankRange rankRange,
boolean generateUpdateBefore, boolean outputRankNumber) {
return new AppendOnlyTopNFunction(minTime.toMilliseconds(),
maxTime.toMilliseconds(),
inputRowType,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
}

@Test
public void testVariableRankRange() throws Exception {
AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER,
// 指定数据的第2个字段值为 rankEnd,动态指定 TopN 集合的大小
new VariableRankRange(1),
true,
// 不用输出 topN 的排序序号
false);
// 将 TopNFunction 包装进 KeyedProcessOperator
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func);
// 测试类准备工作
testHarness.open();

// KeyedProcessOperator 作为 input operator 模拟处理数据
testHarness.processElement(insertRecord("book", 2L, 12)); // 开始处理(book,2,12),key 为 book,rankEnd 为 2,加入 TopN 集合
testHarness.processElement(insertRecord("book", 2L, 19)); // 开始处理(book,2,19),key 为 book,rankEnd 为 2,加入 TopN 集合
testHarness.processElement(insertRecord("book", 2L, 11)); // 开始处理(book,2,11),key 为 book,rankEnd 为 2,超出 TopN 集合容量,因此需要先删除 (book,2,19),留下 2 个较小的
testHarness.processElement(insertRecord("fruit", 1L, 33)); // 开始处理(fruit,1,33),key 为 fruit,rankEnd 为 1,加入 TopN 集合
testHarness.processElement(insertRecord("fruit", 1L, 44)); // 开始处理(fruit,1,44),key 为 fruit,rankEnd 为 1,44 > 33,直接过滤掉
testHarness.processElement(insertRecord("fruit", 1L, 22)); // 开始处理(fruit,1,22),key 为 fruit,rankEnd 为 1,超出 TopN 集合容量,因此需要先删除 (fruit,1,33),留下 1 个较小的
testHarness.close();

ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
for (Object o : output) {
StreamRecord streamRecord = (StreamRecord) o;
System.out.println("Output element -> " + streamRecord.getValue());
}

List<Object> expectedOutput = new ArrayList<>();
// ("book", 2L, 12)
expectedOutput.add(insertRecord("book", 2L, 12));
// ("book", 2L, 19)
expectedOutput.add(insertRecord("book", 2L, 19));
// ("book", 2L, 11)
expectedOutput.add(deleteRecord("book", 2L, 19));
expectedOutput.add(insertRecord("book", 2L, 11));
// ("fruit", 1L, 33)
expectedOutput.add(insertRecord("fruit", 1L, 33));
// ("fruit", 1L, 44)
// ("fruit", 1L, 22)
expectedOutput.add(deleteRecord("fruit", 1L, 33));
expectedOutput.add(insertRecord("fruit", 1L, 22));
assertorWithoutRowNumber
.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
abstract class TopNFunctionTestBase {

// key 比较器的类加载工具类,生成一个 key 比较器实例
static GeneratedRecordComparator sortKeyComparator = new GeneratedRecordComparator("", "", new Object[0]) {

private static final long serialVersionUID = 1434685115916728955L;

@Override
public RecordComparator newInstance(ClassLoader classLoader) {

// compare(RowData o1, RowData o2) 方法中比较 o1 和 o2 的第 0 个元素,从小到大比较
return IntRecordComparator.INSTANCE;
}
};
}
1
2
3
4
5
6
7
8
abstract class TopNFunctionTestBase {

private int sortKeyIdx = 2;

// key 选择器,比较 RowData 中的第 2 位置的元素
BinaryRowDataKeySelector sortKeySelector = new BinaryRowDataKeySelector(new int[]{sortKeyIdx},
inputRowType.getLogicalTypes());
}

输出结果为:

1
2
3
4
5
6
7
Output element -> +I(book,2,12)   
Output element -> +I(book,2,19)
Output element -> -D(book,2,19)
Output element -> +I(book,2,11)
Output element -> +I(fruit,1,33)
Output element -> -D(fruit,1,33)
Output element -> +I(fruit,1,22)

RetractableTopNFunction

内部使用 TreeMap 进行 TopN 排序,可以对数据执行撤回操作,RowKind.UPDATE_BEFORE(-U)。

RetractableTopNFunction 中有如下属性,记录相同的 RowData 列表,使用 sortedMap 来进行 TopN 排序:

1
2
3
4
5
6
7
8
9
10
11
// a map state stores mapping from sort key to records list
/**
* RowData <-> 相同的 RowData list,状态后端远程维护
*/
private transient MapState<RowData, List<RowData>> dataState;

// a sorted map stores mapping from sort key to records count
/**
* RowData <-> 对应的记录个数,ValueState 中记录有序的 RowData
*/
private transient ValueState<SortedMap<RowData, Long>> treeMap;

RetractableTopNFunction 中的 processElement() 方法,按照数据的 RowKind 分别执行 emit 和 retract 操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@Override
public void processElement(RowData input, Context ctx, Collector<RowData> out) throws Exception {
long currentTime = ctx.timerService().currentProcessingTime();
// register state-cleanup timer
registerProcessingCleanupTimer(ctx, currentTime);
initRankEnd(input);

// 从状态后端中获取有序 RowData 的集合
SortedMap<RowData, Long> sortedMap = treeMap.value();
if (sortedMap == null) {
// 如果为 null,则新建一个,指定 sortKey comparator
sortedMap = new TreeMap<>(sortKeyComparator);
}

RowData sortKey = sortKeySelector.getKey(input);
// RowKind.INSERT 或 RowKind.UPDATE_AFTER
boolean isAccumulate = RowDataUtil.isAccumulateMsg(input);
// erase row kind for further state accessing
input.setRowKind(RowKind.INSERT);

if (isAccumulate) {
// update sortedMap,记录当前 sortKey 的记录数到状态后端
if (sortedMap.containsKey(sortKey)) {
sortedMap.put(sortKey, sortedMap.get(sortKey) + 1);
} else {
sortedMap.put(sortKey, 1L);
}

// emit
if (outputRankNumber || hasOffset()) {
// the without-number-algorithm can't handle topN with offset,
// so use the with-number-algorithm to handle offset
emitRecordsWithRowNumber(sortedMap, sortKey, input, out);
} else {
emitRecordsWithoutRowNumber(sortedMap, sortKey, input, out);
}

// 同步更新到状态后端
// update data state
List<RowData> inputs = dataState.get(sortKey);
if (inputs == null) {
// the sort key is never seen
inputs = new ArrayList<>();
}
inputs.add(input);
dataState.put(sortKey, inputs);
} else {
// emit updates first,先输出 update 操作,-U 代表执行撤回操作
if (outputRankNumber || hasOffset()) {
// the without-number-algorithm can't handle topN with offset,
// so use the with-number-algorithm to handle offset
retractRecordWithRowNumber(sortedMap, sortKey, input, out);
} else {
retractRecordWithoutRowNumber(sortedMap, sortKey, input, out);
}

// and then update sortedMap
if (sortedMap.containsKey(sortKey)) {
long count = sortedMap.get(sortKey) - 1;
if (count == 0) {
sortedMap.remove(sortKey);
} else {
sortedMap.put(sortKey, count);
}
} else {
if (sortedMap.isEmpty()) {
if (lenient) {
LOG.warn(STATE_CLEARED_WARN_MSG);
} else {
throw new RuntimeException(STATE_CLEARED_WARN_MSG);
}
} else {
throw new RuntimeException(
"Can not retract a non-existent record. This should never happen.");
}
}

}
// 更新状态后端中记录的 sortedMap
treeMap.update(sortedMap);
}

RetractableTopNFunction 中的 emitRecordsWithRowNumber() 方法正常输出排序行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private void emitRecordsWithRowNumber(
SortedMap<RowData, Long> sortedMap, RowData sortKey, RowData inputRow, Collector<RowData> out)
throws Exception {
Iterator<Map.Entry<RowData, Long>> iterator = sortedMap.entrySet().iterator();
long currentRank = 0L;
RowData currentRow = null;
boolean findsSortKey = false;
while (iterator.hasNext() && isInRankEnd(currentRank)) {
Map.Entry<RowData, Long> entry = iterator.next();
RowData key = entry.getKey();
if (!findsSortKey && key.equals(sortKey)) {
currentRank += entry.getValue();
currentRow = inputRow;
// 从 sortedMap 中找到当前的 sortKey
findsSortKey = true;
} else if (findsSortKey) {
List<RowData> inputs = dataState.get(key);
if (inputs == null) {
// Skip the data if it's state is cleared because of state ttl.
if (lenient) {
LOG.warn(STATE_CLEARED_WARN_MSG);
} else {
throw new RuntimeException(STATE_CLEARED_WARN_MSG);
}
} else {
int i = 0;
while (i < inputs.size() && isInRankEnd(currentRank)) {
RowData prevRow = inputs.get(i); // 取出前一个row
collectUpdateBefore(out, prevRow, currentRank);
collectUpdateAfter(out, currentRow, currentRank); //输出当前行
currentRow = prevRow; // 前一行赋给当前行
currentRank += 1;
i++;
}
}
} else {
currentRank += entry.getValue();
}
}
if (isInRankEnd(currentRank)) {
// there is no enough elements in Top-N, emit INSERT message for the new record.
collectInsert(out, currentRow, currentRank);
}
}

RetractableTopNFunction 中的 retractRecordWithRowNumber() 方法将撤回行从 sortedMap 中移除,并更新前一行的排位输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private void retractRecordWithRowNumber(
SortedMap<RowData, Long> sortedMap, RowData sortKey, RowData inputRow, Collector<RowData> out)
throws Exception {
Iterator<Map.Entry<RowData, Long>> iterator = sortedMap.entrySet().iterator();
long currentRank = 0L;
RowData prevRow = null;
boolean findsSortKey = false;
while (iterator.hasNext() && isInRankEnd(currentRank)) {
Map.Entry<RowData, Long> entry = iterator.next();
RowData key = entry.getKey();
if (!findsSortKey && key.equals(sortKey)) {
List<RowData> inputs = dataState.get(key);
if (inputs == null) {
// Skip the data if it's state is cleared because of state ttl.
if (lenient) {
LOG.warn(STATE_CLEARED_WARN_MSG);
} else {
throw new RuntimeException(STATE_CLEARED_WARN_MSG);
}
} else {
Iterator<RowData> inputIter = inputs.iterator();
while (inputIter.hasNext() && isInRankEnd(currentRank)) {
RowData currentRow = inputIter.next();
if (!findsSortKey && equaliser.equals(currentRow, inputRow)) {
prevRow = currentRow;
findsSortKey = true;
inputIter.remove();
} else if (findsSortKey) {
collectUpdateBefore(out, prevRow, currentRank);
collectUpdateAfter(out, currentRow, currentRank);
prevRow = currentRow;
}
currentRank += 1;

}
if (inputs.isEmpty()) {
dataState.remove(key); // 将撤回的行从 sortedMap 中移除
} else {
dataState.put(key, inputs);
}
}
} else if (findsSortKey) {
List<RowData> inputs = dataState.get(key);
int i = 0;
while (i < inputs.size() && isInRankEnd(currentRank)) {
RowData currentRow = inputs.get(i); // 上一行作为当前行
// 处理上一条数据
collectUpdateBefore(out, prevRow, currentRank);
// 输出当前行
collectUpdateAfter(out, currentRow, currentRank);
prevRow = currentRow;
currentRank += 1;
i++;
}
} else {
currentRank += entry.getValue();
}
}
if (isInRankEnd(currentRank)) {
// there is no enough elements in Top-N, emit DELETE message for the retract record.
collectDelete(out, prevRow, currentRank);
}
}

RetractableTopNFunctionTest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

/**
* Tests for {@link RetractableTopNFunction}.
*/
public class RetractableTopNFunctionTest extends TopNFunctionTestBase {

@Override
protected AbstractTopNFunction createFunction(RankType rankType, RankRange rankRange,
boolean generateUpdateBefore, boolean outputRankNumber) {
return new RetractableTopNFunction(
minTime.toMilliseconds(),
maxTime.toMilliseconds(),
inputRowType,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generatedEqualiser,
generateUpdateBefore,
outputRankNumber);
}

@Test
public void testProcessRetractMessageWithNotGenerateUpdateBefore() throws Exception {
AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER,
// 固定的 TopN 集合大小,1~2
new ConstantRankRange(1, 2),
false,
// 输出 TopN 的排序序号
true);
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func);
testHarness.open();
testHarness.processElement(insertRecord("book", 1L, 12));
testHarness.processElement(insertRecord("book", 2L, 19));
testHarness.processElement(insertRecord("book", 4L, 11));
testHarness.processElement(updateBeforeRecord("book", 1L, 12));
testHarness.processElement(insertRecord("book", 5L, 11));
testHarness.processElement(insertRecord("fruit", 4L, 33));
testHarness.processElement(insertRecord("fruit", 3L, 44));
testHarness.processElement(insertRecord("fruit", 5L, 22));
testHarness.close();

ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
for (Object o : output) {
StreamRecord streamRecord = (StreamRecord) o;
System.out.println("Output element -> " + streamRecord.getValue());
}

List<Object> expectedOutput = new ArrayList<>();
// ("book", 1L, 12)
// sortedMap -> [("book", 1L, 12)]
expectedOutput.add(insertRecord("book", 1L, 12, 1L));
// ("book", 2L, 19)
// sortedMap -> [("book", 1L, 12)],[("book", 2L, 19)]
expectedOutput.add(insertRecord("book", 2L, 19, 2L));
// ("book", 4L, 11)
// sortedMap -> [("book", 4L, 11)],[("book", 1L, 12)],[("book", 2L, 19)]
expectedOutput.add(updateAfterRecord("book", 4L, 11, 1L));
expectedOutput.add(updateAfterRecord("book", 1L, 12, 2L));
// UB ("book", 1L, 12),撤回即将 ("book", 1L ,12) 从 sortedMap 中移除,("book", 2L, 19)的排序被更新为 2
// sortedMap -> [("book", 4L, 11)],[("book", 2L, 19)]
expectedOutput.add(updateAfterRecord("book", 2L, 19, 2L));
// ("book", 5L, 11)
// sortedMap -> [("book", 4L, 11),("book", 5L, 11)],[("book", 2L, 19)],("book", 5L, 11) 的排序被更新为 2
expectedOutput.add(updateAfterRecord("book", 5L, 11, 2L));
// ("fruit", 4L, 33)
// ("fruit", 3L, 44)
expectedOutput.add(insertRecord("fruit", 4L, 33, 1L));
expectedOutput.add(insertRecord("fruit", 3L, 44, 2L));
// ("fruit", 5L, 22)
expectedOutput.add(updateAfterRecord("fruit", 5L, 22, 1L));
expectedOutput.add(updateAfterRecord("fruit", 4L, 33, 2L));
assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
}

}

输出结果如下:

1
2
3
4
5
6
7
8
9
10
Output element -> +I{row1=+I(book,1,12), row2=+I(1)}
Output element -> +I{row1=+I(book,2,19), row2=+I(2)}
Output element -> +U{row1=+I(book,4,11), row2=+I(1)}
Output element -> +U{row1=+I(book,1,12), row2=+I(2)}
Output element -> +U{row1=+I(book,2,19), row2=+I(2)}
Output element -> +U{row1=+I(book,5,11), row2=+I(2)}
Output element -> +I{row1=+I(fruit,4,33), row2=+I(1)}
Output element -> +I{row1=+I(fruit,3,44), row2=+I(2)}
Output element -> +U{row1=+I(fruit,5,22), row2=+I(1)}
Output element -> +U{row1=+I(fruit,4,33), row2=+I(2)}

UpdatableTopNFunction

支持更新流,是 RetractableTopNFunction 的简单实现版本,输入流中不能包含 DELETE 和 UPDATE_BEFORE 操作。

UpdatableTopNFunctionTest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* Tests for {@link UpdatableTopNFunction}.
*/
public class UpdatableTopNFunctionTest extends TopNFunctionTestBase {

@Override
protected AbstractTopNFunction createFunction(
RankType rankType,
RankRange rankRange,
boolean generateUpdateBefore,
boolean outputRankNumber) {

return new UpdatableTopNFunction(
minTime.toMilliseconds(),
maxTime.toMilliseconds(),
inputRowType,
rowKeySelector,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
}

@Test
public void testVariableRankRange() throws Exception {
AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER,
// TopN 的集合大小随着数据动态变化
new VariableRankRange(1),
true,
false);
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func);
testHarness.open();
testHarness.processElement(insertRecord("book", 2L, 19));
testHarness.processElement(updateAfterRecord("book", 2L, 18));
testHarness.processElement(insertRecord("fruit", 1L, 44));
testHarness.processElement(updateAfterRecord("fruit", 1L, 33));
testHarness.processElement(updateAfterRecord("fruit", 1L, 22));
testHarness.close();

ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
for (Object o : output) {
StreamRecord streamRecord = (StreamRecord) o;
System.out.println("Output element -> " + streamRecord.getValue());
}

List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(insertRecord("book", 2L, 19));
expectedOutput.add(updateBeforeRecord("book", 2L, 19));
expectedOutput.add(updateAfterRecord("book", 2L, 18));
expectedOutput.add(insertRecord("fruit", 1L, 44));
expectedOutput.add(updateBeforeRecord("fruit", 1L, 44));
expectedOutput.add(updateAfterRecord("fruit", 1L, 33));
expectedOutput.add(updateBeforeRecord("fruit", 1L, 33));
expectedOutput.add(updateAfterRecord("fruit", 1L, 22));
assertorWithoutRowNumber
.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
}
}

输出结果如下:

1
2
3
4
5
6
7
8
Output element -> +I(book,2,19)
Output element -> -U(book,2,19)
Output element -> +U(book,2,18)
Output element -> +I(fruit,1,44)
Output element -> -U(fruit,1,44)
Output element -> +U(fruit,1,33)
Output element -> -U(fruit,1,33)
Output element -> +U(fruit,1,22)