Flink-table-store: MergeTreeWriter

flink-table-store git

本文了解下 MergeTreeWriter 的源码实现。

leveldb

MergeTreeWriter

测试用例

SstFileTest

SstFileTest#testWriteAndReadSstFileWithStatsCollectingRollingFilegit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/** Tests for {@link SstFileReader} and {@link SstFileWriter}. */
public class SstFileTest {

/**
* 生产 kv 数据工具类
*/
private final SstTestDataGenerator gen =
SstTestDataGenerator.builder().memTableCapacity(20).build();

@RepeatedTest(1)
public void testWriteAndReadSstFileWithStatsCollectingRollingFile() throws Exception {
testWriteAndReadSstFileImpl("avro");
}


/**
*
* ① TestKeyValueGenerator 生成一批数据
* ② 当某一个 partition/bucket 首先达到 memTable 内存阈值20,停止生成数据
* ③ 按 key,sequenceNumber 排序
* ④ 按 key 聚合去重
* ⑤ 为去重后的 kv list 构建一个内存 sst file,并组装统计信息 SstFileMeta
* ⑥ 内存 kv list 及 SstFileMeta 以 Data 返回
* ⑦ 创建 SstFileWriter ,一个 partition/bucket 对应一个 SstFileWriter ,并将 kv list 数据封装成 CloseableIterator
* ⑧ 执行 SstFileWriter.write() --> SstRollingFile.write() --> RollingFile.write() --> AvroBulkWriter.write()
* ⑨ 写入完成后,SstRollingFile.collectFile(Path) 组装统计信息返回 SstFileMeta
* ⑩ 比较 Data 的 SstFileMeta 与 SstFileWriter.write() 返回的 SstFileMeta
* ⑪ 创建 SstFileReader -> sstFileReader.read -> SstFileRecordReader -> 生成 FileSourceSplit -> 使用 BulkFormat.Reader 进行真正读取
* ⑫ RecordReaderIterator 包装 SstFileRecordReader 遍历读取数据,与当前 Data 中的 kv list 做比对
*
*/
private void testWriteAndReadSstFileImpl(String format) throws Exception {

// 造一波数据,有一个 partition/bucket 达到 memTable 阈值 20 则返回
SstTestDataGenerator.Data data = gen.next();

// 创建一个 SstFileWriter
SstFileWriter writer = createSstFileWriter(tempDir.toString(), format);

SstFileMetaSerializer serializer =
new SstFileMetaSerializer(
TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE);

// 执行写入,循环遍历数据写入
// SstFileWriter.write() --> SstRollingFile.write() --> RollingFile.write() --> AvroBulkWriter.write()
// 写入完成后,SstRollingFile.collectFile(Path) 组装统计信息返回 SstFileMeta
List<SstFileMeta> actualMetas =
writer.write(CloseableIterator.fromList(data.content, kv -> {}), 0);

// 将造数据内存 sst file 记录的 meta 与真正写入磁盘产生的 sst file meta 做对比
checkRollingFiles(data.meta, actualMetas, writer.suggestedFileSize());

SstFileReader reader = createSstFileReader(tempDir.toString(), format, null, null);
assertData(
data,
actualMetas,
TestKeyValueGenerator.KEY_SERIALIZER,
TestKeyValueGenerator.ROW_SERIALIZER,
serializer,
reader,
kv -> kv);
}

private SstFileWriter createSstFileWriter(String path, String format) {
FileStorePathFactory pathFactory = new FileStorePathFactory(new Path(path));
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
return new SstFileWriter.Factory(
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.ROW_TYPE,
// 每一行新加的元素,都会执行 flush
new FlushingFileFormat(format),
pathFactory,
suggestedFileSize)
.create(BinaryRowDataUtil.EMPTY_ROW, 0);
}

}
SstTestDataGenerator.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/** Random {@link SstFileMeta} generator. */
public class SstTestDataGenerator {

public Data next() {

while (true) {
// 生成一行 kv 数据
KeyValue kv = gen.next();
// 获取 key
BinaryRowData key = (BinaryRowData) kv.key();
// 获取 partition
BinaryRowData partition = gen.getPartition(kv);
// 获取 bucket , numBuckets = 3
int bucket = (key.hashCode() % numBuckets + numBuckets) % numBuckets;
// List<Map<BinaryRowData, List<KeyValue>>>,每一个元素代表一个分桶,分桶中 <partition,List<KeyValue>>
List<KeyValue> memTable =
memTables.get(bucket).computeIfAbsent(partition, k -> new ArrayList<>());
memTable.add(kv);

System.out.println(String.format("sequenceNumber -> %s,key-> %s, partition -> %s,bucket -> %s,op -> %s, value -> %s",
kv.sequenceNumber(),
"(" + kv.key().getInt(0) + "," + kv.key().getLong(1) + ")",
kv.value().getString(0) + "" + kv.value().getInt(1),
bucket,
kv.valueKind().toString(),
kv.value().getLong(4)
));

// memTableCapacity = 20,假设分区:20211111,分桶:0,达到阈值则创建 sst file
if (memTable.size() >= memTableCapacity) {

List<Data> result = createSstFiles(memTable, 0, partition, bucket);
memTable.clear();
assert result.size() == 1;
return result.get(0);
}
}
}

public List<Data> createSstFiles(
List<KeyValue> kvs, int level, BinaryRowData partition, int bucket) {

// 将数据根据 key,sequenceNumber 排序
gen.sort(kvs);

List<KeyValue> combined = new ArrayList<>();
for (int i = 0; i + 1 < kvs.size(); i++) {
KeyValue now = kvs.get(i);
KeyValue next = kvs.get(i + 1);
if (!now.key().equals(next.key())) {
// 取每个 key 的最新 sequenceNumber ,覆盖前一个
combined.add(now);
}
}
combined.add(kvs.get(kvs.size() - 1)); // 补齐最后一个

// level=0
int capacity = memTableCapacity;
for (int i = 0; i < level; i++) {
// 根据 level 来扩容
capacity *= memTableCapacity;
}

List<Data> result = new ArrayList<>();
for (int i = 0; i < combined.size(); i += capacity) {
result.add(
// memTable 每满 capacity 大小,为聚合去重后的 list 创建一个 sst file
createSstFile(
combined.subList(i, Math.min(i + capacity, combined.size())),
level,
partition,
bucket));
}
return result;
}

private Data createSstFile(List<KeyValue> kvs, int level, BinaryRowData partition, int bucket) {

// 用来更新 key 的统计信息工具类
FieldStatsCollector keyStatsCollector =
new FieldStatsCollector(TestKeyValueGenerator.KEY_TYPE);

// 用来更新 value 的统计信息工具类
FieldStatsCollector valueStatsCollector =
new FieldStatsCollector(TestKeyValueGenerator.ROW_TYPE);

long totalSize = 0;
BinaryRowData minKey = null;
BinaryRowData maxKey = null;
long minSequenceNumber = Long.MAX_VALUE;
long maxSequenceNumber = Long.MIN_VALUE;
for (KeyValue kv : kvs) {
BinaryRowData key = (BinaryRowData) kv.key();
BinaryRowData value = (BinaryRowData) kv.value();
totalSize += key.getSizeInBytes() + value.getSizeInBytes();
keyStatsCollector.collect(key);
valueStatsCollector.collect(value);
if (minKey == null || TestKeyValueGenerator.KEY_COMPARATOR.compare(key, minKey) < 0) {
minKey = key;
}
if (maxKey == null || TestKeyValueGenerator.KEY_COMPARATOR.compare(key, maxKey) > 0) {
maxKey = key;
}
minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber());
maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber());
}

return new Data(
partition,
bucket,
// 生成 sst file 的 metadata 文件 ,真正写入时的收集过程也类似
new SstFileMeta(
"sst-" + UUID.randomUUID(),
totalSize,
kvs.size(),
minKey,
maxKey,
/*
* 统计 key 中各个字段的最大最小值,null值个数 :
* 0 - shopId(1,9,0) ,表示 shopId 的最小值为1,最大值为9,nullCount 为0
* 1 - orderId(-7709647343742251496, 6202316951969867995) ,表示 orderId 的 最小值为 -7709647343742251496,最大值为 6202316951969867995,null Count 为0
*/
keyStatsCollector.extract(),
valueStatsCollector.extract(),
minSequenceNumber,
maxSequenceNumber,
level),
kvs);
}

/**
* An in-memory SST file.
* 为了方便测试比对,使用内存对象来模拟 sst file
* 与 SstFileWriter 写入返回的 SstFileMeta 做比对
*/
public static class Data {

// 分区
public final BinaryRowData partition;
// 分桶
public final int bucket;
// sst file 的 metadata
public final SstFileMeta meta;
// 数据内容
public final List<KeyValue> content;

private Data(
BinaryRowData partition, int bucket, SstFileMeta meta, List<KeyValue> content) {
this.partition = partition;
this.bucket = bucket;
this.meta = meta;
this.content = content;
}
}

}
TestKeyValueGenerator.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/** Random {@link KeyValue} generator. */
public class TestKeyValueGenerator {

public KeyValue next() {
int op = random.nextInt(5);
Order order = null;
ValueKind kind = ValueKind.ADD;
if (op == 0 && addedOrders.size() > 0) {
// delete order
order = pick(addedOrders);
deletedOrders.add(order);
kind = ValueKind.DELETE;
} else if (op == 1) {
// update order
if (random.nextBoolean() && deletedOrders.size() > 0) {
order = pick(deletedOrders);
} else if (addedOrders.size() > 0) {
order = pick(addedOrders);
}
if (order != null) {
order.update();
addedOrders.add(order);
kind = ValueKind.ADD;
}
}
if (order == null) {
// new order
order = new Order();
addedOrders.add(order);
kind = ValueKind.ADD;
}
return new KeyValue()
.replace(
// key 为 (shopId,orderId) ,kv.key().getInt(0) + kv.key().getLong(1)
KEY_SERIALIZER
.toBinaryRow(GenericRowData.of(order.shopId, order.orderId))
.copy(),
sequenceNumber++,
kind,
ROW_SERIALIZER
.toBinaryRow(
GenericRowData.of(
// partition 为 dt ,kv.value().getString(0)
StringData.fromString(order.dt),
order.hr,
order.shopId,
order.orderId,
order.itemId,
order.priceAmount == null
? null
: new GenericArrayData(order.priceAmount),
StringData.fromString(order.comment)))
.copy());
}

/**
* 将 kv list 按照 key,sequenceNumber 排序
*/
public void sort(List<KeyValue> kvs) {
kvs.sort(
(a, b) -> {
int keyCompareResult = KEY_COMPARATOR.compare(a.key(), b.key());
return keyCompareResult != 0
? keyCompareResult
: Long.compare(a.sequenceNumber(), b.sequenceNumber());
});
}

}

gen 一直产生数据,key 为 (shopId, orderId):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
sequenceNumber -> 0,key-> (6,-5707870044488831310), partition -> 202111118,bucket -> 0,op -> ADD, value -> 2780980699221738542
sequenceNumber -> 1,key-> (6,-5707870044488831310), partition -> 202111118,bucket -> 0,op -> DELETE, value -> 2780980699221738542
sequenceNumber -> 2,key-> (5,-1628886049149411190), partition -> 202111118,bucket -> 0,op -> ADD, value -> -4843258062816469307
sequenceNumber -> 3,key-> (3,-1502604583186164397), partition -> 202111119,bucket -> 2,op -> ADD, value -> 7629700512330682069
sequenceNumber -> 4,key-> (0,-7322677450167513398), partition -> 202111118,bucket -> 2,op -> ADD, value -> 0
sequenceNumber -> 5,key-> (0,-7322677450167513398), partition -> 202111118,bucket -> 2,op -> DELETE, value -> 0
sequenceNumber -> 6,key-> (6,5937024096732189195), partition -> 202111109,bucket -> 0,op -> ADD, value -> 8884526624823745720
sequenceNumber -> 7,key-> (6,4757281131655774586), partition -> 202111108,bucket -> 2,op -> ADD, value -> -1390660808551069179
sequenceNumber -> 8,key-> (9,-47790794541920130), partition -> 202111119,bucket -> 1,op -> ADD, value -> -3041082948826019863
sequenceNumber -> 9,key-> (2,6656455485638391580), partition -> 202111118,bucket -> 0,op -> ADD, value -> 1539430626988409641
sequenceNumber -> 10,key-> (6,-7180799390720755610), partition -> 202111119,bucket -> 0,op -> ADD, value -> -471344025710360044
sequenceNumber -> 11,key-> (5,-6044235594482092723), partition -> 202111118,bucket -> 2,op -> ADD, value -> -6168492336680241066
sequenceNumber -> 12,key-> (3,3437933508159239703), partition -> 202111119,bucket -> 2,op -> ADD, value -> -2771616260199190183
sequenceNumber -> 13,key-> (3,1406959166072420180), partition -> 202111118,bucket -> 1,op -> ADD, value -> 0
sequenceNumber -> 14,key-> (9,1548022128799387646), partition -> 202111118,bucket -> 2,op -> ADD, value -> 0
sequenceNumber -> 15,key-> (3,7529185916302791697), partition -> 202111108,bucket -> 2,op -> ADD, value -> 7061550199722317899
sequenceNumber -> 16,key-> (4,-6161397105244672436), partition -> 202111109,bucket -> 0,op -> ADD, value -> 7720821486888822402
sequenceNumber -> 17,key-> (5,-1628886049149411190), partition -> 202111118,bucket -> 0,op -> DELETE, value -> -4843258062816469307
sequenceNumber -> 18,key-> (8,9071325512152982886), partition -> 202111108,bucket -> 2,op -> ADD, value -> -5977331971180020420
sequenceNumber -> 19,key-> (9,6189042875316582636), partition -> 202111108,bucket -> 2,op -> ADD, value -> 3021498374064348785
sequenceNumber -> 20,key-> (9,6343418366398619495), partition -> 202111109,bucket -> 1,op -> ADD, value -> -6714242845052107042
sequenceNumber -> 21,key-> (3,-6539095021197140424), partition -> 202111108,bucket -> 1,op -> ADD, value -> 8378366812547696285
sequenceNumber -> 22,key-> (6,-8753039155605858734), partition -> 202111119,bucket -> 1,op -> ADD, value -> 5512574968172962020
sequenceNumber -> 23,key-> (2,4925395670238937618), partition -> 202111108,bucket -> 0,op -> ADD, value -> -7925549160503055693
sequenceNumber -> 24,key-> (5,-6044235594482092723), partition -> 202111118,bucket -> 2,op -> DELETE, value -> -6168492336680241066
sequenceNumber -> 25,key-> (4,-5572761828207512513), partition -> 202111108,bucket -> 0,op -> ADD, value -> -1780575509543362575
sequenceNumber -> 26,key-> (1,-7142099498597121533), partition -> 202111109,bucket -> 0,op -> ADD, value -> 1917445126268645259
sequenceNumber -> 27,key-> (0,-7322677450167513398), partition -> 202111118,bucket -> 2,op -> ADD, value -> 9095332047559518137
sequenceNumber -> 28,key-> (3,-1502604583186164397), partition -> 202111119,bucket -> 2,op -> DELETE, value -> 7629700512330682069
sequenceNumber -> 29,key-> (7,-4422302527273897180), partition -> 202111118,bucket -> 1,op -> ADD, value -> 0
sequenceNumber -> 30,key-> (0,-7322677450167513398), partition -> 202111118,bucket -> 2,op -> DELETE, value -> 9095332047559518137
sequenceNumber -> 31,key-> (5,8365091192911527232), partition -> 202111109,bucket -> 2,op -> ADD, value -> -171710651702984100
sequenceNumber -> 32,key-> (1,6624769519813278620), partition -> 202111108,bucket -> 1,op -> ADD, value -> -1156217459149519162
sequenceNumber -> 33,key-> (4,-5572761828207512513), partition -> 202111108,bucket -> 0,op -> DELETE, value -> -1780575509543362575
sequenceNumber -> 34,key-> (3,-1502604583186164397), partition -> 202111119,bucket -> 2,op -> ADD, value -> 3846349663287424781
sequenceNumber -> 35,key-> (5,-1628886049149411190), partition -> 202111118,bucket -> 0,op -> ADD, value -> -6332539298434244959
sequenceNumber -> 36,key-> (1,-5623847903170117779), partition -> 202111109,bucket -> 0,op -> ADD, value -> 0
sequenceNumber -> 37,key-> (4,300655443661245521), partition -> 202111118,bucket -> 1,op -> ADD, value -> 5387082838317835172
sequenceNumber -> 38,key-> (9,7861014525117262749), partition -> 202111119,bucket -> 2,op -> ADD, value -> -9122542604915127637
sequenceNumber -> 39,key-> (9,-47790794541920130), partition -> 202111119,bucket -> 1,op -> ADD, value -> -227302505361441636
sequenceNumber -> 40,key-> (4,300655443661245521), partition -> 202111118,bucket -> 1,op -> ADD, value -> 7922649361152425018
sequenceNumber -> 41,key-> (5,358240165604482159), partition -> 202111119,bucket -> 1,op -> ADD, value -> 7317582423751248438
sequenceNumber -> 42,key-> (0,-3660045172252280972), partition -> 202111109,bucket -> 1,op -> ADD, value -> -328975871197147907
sequenceNumber -> 43,key-> (0,-6226701972071386686), partition -> 202111109,bucket -> 0,op -> ADD, value -> 0
sequenceNumber -> 44,key-> (7,-1094427646050551581), partition -> 202111109,bucket -> 2,op -> ADD, value -> 4801112210107529658
sequenceNumber -> 45,key-> (8,8881224827715078628), partition -> 202111119,bucket -> 1,op -> ADD, value -> -6356315545715006682
sequenceNumber -> 46,key-> (2,6656455485638391580), partition -> 202111118,bucket -> 0,op -> ADD, value -> -7107196345880948914
sequenceNumber -> 47,key-> (6,160286918159499774), partition -> 202111119,bucket -> 2,op -> ADD, value -> -8739427651217120826
sequenceNumber -> 48,key-> (1,6624769519813278620), partition -> 202111108,bucket -> 1,op -> DELETE, value -> -1156217459149519162
sequenceNumber -> 49,key-> (3,-4529073475644960156), partition -> 202111118,bucket -> 0,op -> ADD, value -> -6329541255341810188
sequenceNumber -> 50,key-> (5,-8147739948029934462), partition -> 202111118,bucket -> 0,op -> ADD, value -> -1573247422349659478
sequenceNumber -> 51,key-> (1,-5452898194577942329), partition -> 202111109,bucket -> 1,op -> ADD, value -> -3089621422791274897
sequenceNumber -> 52,key-> (2,2615073161895980616), partition -> 202111119,bucket -> 0,op -> ADD, value -> -3380892025983806981
sequenceNumber -> 53,key-> (2,-6849632269263576984), partition -> 202111119,bucket -> 0,op -> ADD, value -> 0
sequenceNumber -> 54,key-> (3,3608248884997787695), partition -> 202111109,bucket -> 0,op -> ADD, value -> -3940207861520786810
sequenceNumber -> 55,key-> (0,2337641240443581856), partition -> 202111109,bucket -> 1,op -> ADD, value -> -8588877348912160578
sequenceNumber -> 56,key-> (8,-4178514139166246973), partition -> 202111108,bucket -> 1,op -> ADD, value -> 7917094582753434374
sequenceNumber -> 57,key-> (0,8052082081818425113), partition -> 202111119,bucket -> 0,op -> ADD, value -> -4394687097038065501
sequenceNumber -> 58,key-> (1,7579925405939227085), partition -> 202111118,bucket -> 1,op -> ADD, value -> -810978710328400623
sequenceNumber -> 59,key-> (0,8105308727850398433), partition -> 202111119,bucket -> 0,op -> ADD, value -> 5251140189951144076
sequenceNumber -> 60,key-> (4,-5185295859586056999), partition -> 202111108,bucket -> 2,op -> ADD, value -> -1799448936150420608
sequenceNumber -> 61,key-> (2,8374523283379480403), partition -> 202111109,bucket -> 0,op -> ADD, value -> 5038785059845636753
sequenceNumber -> 62,key-> (6,5937024096732189195), partition -> 202111109,bucket -> 0,op -> ADD, value -> 6587997598195970439
sequenceNumber -> 63,key-> (4,-5572761828207512513), partition -> 202111108,bucket -> 0,op -> ADD, value -> 4318264364606337424
sequenceNumber -> 64,key-> (1,1500104998893080687), partition -> 202111109,bucket -> 2,op -> ADD, value -> -5928705892222458487
sequenceNumber -> 65,key-> (5,-1018793110018941802), partition -> 202111108,bucket -> 1,op -> ADD, value -> 4978555998504311137
sequenceNumber -> 66,key-> (9,-2708037046618742102), partition -> 202111109,bucket -> 2,op -> ADD, value -> 4185830207643971286
sequenceNumber -> 67,key-> (8,8356260898591690767), partition -> 202111118,bucket -> 0,op -> ADD, value -> -8056709699793297362
sequenceNumber -> 68,key-> (1,-5452898194577942329), partition -> 202111109,bucket -> 1,op -> ADD, value -> -1004674958420378720
sequenceNumber -> 69,key-> (0,-7278244430086253408), partition -> 202111108,bucket -> 2,op -> ADD, value -> 8124302997183225112
sequenceNumber -> 70,key-> (5,-6044235594482092723), partition -> 202111118,bucket -> 2,op -> ADD, value -> 5347576024187315055
sequenceNumber -> 71,key-> (0,-7322677450167513398), partition -> 202111118,bucket -> 2,op -> ADD, value -> 6541352840070592737
sequenceNumber -> 72,key-> (3,1406959166072420180), partition -> 202111118,bucket -> 1,op -> ADD, value -> 3656191246660617952
sequenceNumber -> 73,key-> (0,-3728475157919153383), partition -> 202111119,bucket -> 0,op -> ADD, value -> 5138071113563883920
sequenceNumber -> 74,key-> (4,9049862483926656699), partition -> 202111119,bucket -> 1,op -> ADD, value -> 335682238008025261
sequenceNumber -> 75,key-> (0,-7278244430086253408), partition -> 202111108,bucket -> 2,op -> ADD, value -> 147913672200092986
sequenceNumber -> 76,key-> (5,4613811878340843445), partition -> 202111109,bucket -> 0,op -> ADD, value -> 7183355362201992291
sequenceNumber -> 77,key-> (5,-7361894621669707690), partition -> 202111109,bucket -> 1,op -> ADD, value -> 2151354001189111669
sequenceNumber -> 78,key-> (8,-2203809412923262991), partition -> 202111119,bucket -> 1,op -> ADD, value -> 3936942235225713244
sequenceNumber -> 79,key-> (5,-6505213751965926244), partition -> 202111108,bucket -> 2,op -> ADD, value -> 0
sequenceNumber -> 80,key-> (6,586933908778291566), partition -> 202111109,bucket -> 0,op -> ADD, value -> 5353448919407202023
sequenceNumber -> 81,key-> (2,4167920832653664558), partition -> 202111108,bucket -> 2,op -> ADD, value -> -1608623588433602165
sequenceNumber -> 82,key-> (2,1841816909855723911), partition -> 202111118,bucket -> 0,op -> ADD, value -> 0
sequenceNumber -> 83,key-> (4,-972330344039028842), partition -> 202111109,bucket -> 1,op -> ADD, value -> -3509805453120084345
sequenceNumber -> 84,key-> (8,2824656755364722708), partition -> 202111108,bucket -> 0,op -> ADD, value -> -5257788714540757740
sequenceNumber -> 85,key-> (3,4825546765402666544), partition -> 202111108,bucket -> 2,op -> ADD, value -> -2391968129098960716
sequenceNumber -> 86,key-> (3,7529185916302791697), partition -> 202111108,bucket -> 2,op -> ADD, value -> -3918158072347315804
sequenceNumber -> 87,key-> (2,-3348977801879129568), partition -> 202111109,bucket -> 2,op -> ADD, value -> 1881148083149836831
sequenceNumber -> 88,key-> (2,4013523848884005729), partition -> 202111108,bucket -> 2,op -> ADD, value -> 0
sequenceNumber -> 89,key-> (6,-5707870044488831310), partition -> 202111118,bucket -> 0,op -> ADD, value -> 0
sequenceNumber -> 90,key-> (0,8052082081818425113), partition -> 202111119,bucket -> 0,op -> ADD, value -> 0
sequenceNumber -> 91,key-> (9,-3592114153078238858), partition -> 202111109,bucket -> 0,op -> ADD, value -> -2300521514824600373
sequenceNumber -> 92,key-> (2,2615073161895980616), partition -> 202111119,bucket -> 0,op -> ADD, value -> -9184374245320825550
sequenceNumber -> 93,key-> (6,-4929214422972869044), partition -> 202111118,bucket -> 0,op -> ADD, value -> 0
sequenceNumber -> 94,key-> (0,-1119753832294541381), partition -> 202111108,bucket -> 0,op -> ADD, value -> -4659351591347526663
sequenceNumber -> 95,key-> (8,8881224827715078628), partition -> 202111119,bucket -> 1,op -> DELETE, value -> -6356315545715006682
sequenceNumber -> 96,key-> (2,4925395670238937618), partition -> 202111108,bucket -> 0,op -> DELETE, value -> -7925549160503055693
sequenceNumber -> 97,key-> (8,-6326910797838432808), partition -> 202111109,bucket -> 0,op -> ADD, value -> 3157902831734464520
sequenceNumber -> 98,key-> (5,-2843916878703362404), partition -> 202111118,bucket -> 1,op -> ADD, value -> -7274338587888136170
sequenceNumber -> 99,key-> (7,-9073451180094251128), partition -> 202111109,bucket -> 0,op -> ADD, value -> -8421032384434482841
sequenceNumber -> 100,key-> (9,-2058923199536498074), partition -> 202111118,bucket -> 1,op -> ADD, value -> 3025392192359206114
sequenceNumber -> 101,key-> (4,-6997582686150743744), partition -> 202111109,bucket -> 0,op -> ADD, value -> 6881154136677825108
sequenceNumber -> 102,key-> (6,1396773201962230824), partition -> 202111108,bucket -> 1,op -> ADD, value -> -395580717194806860
sequenceNumber -> 103,key-> (5,2836340277676703640), partition -> 202111108,bucket -> 1,op -> ADD, value -> -6889713077159086526
sequenceNumber -> 104,key-> (5,-5531587458323916081), partition -> 202111109,bucket -> 1,op -> ADD, value -> -7936623144280934674
sequenceNumber -> 105,key-> (9,-3592114153078238858), partition -> 202111109,bucket -> 0,op -> DELETE, value -> -2300521514824600373
sequenceNumber -> 106,key-> (7,3761238010779910460), partition -> 202111118,bucket -> 0,op -> ADD, value -> 1741046247964567542
sequenceNumber -> 107,key-> (9,-2058923199536498074), partition -> 202111118,bucket -> 1,op -> DELETE, value -> 3025392192359206114
sequenceNumber -> 108,key-> (5,-3150172398625062392), partition -> 202111109,bucket -> 1,op -> ADD, value -> 2414372112695353287
sequenceNumber -> 109,key-> (2,4925395670238937618), partition -> 202111108,bucket -> 0,op -> ADD, value -> 0
sequenceNumber -> 110,key-> (2,-2517148717059342296), partition -> 202111108,bucket -> 0,op -> ADD, value -> 4871096411802221114
sequenceNumber -> 111,key-> (4,-7429391660457277238), partition -> 202111119,bucket -> 0,op -> ADD, value -> 1403520732423394697
sequenceNumber -> 112,key-> (3,-8204171576376413410), partition -> 202111108,bucket -> 0,op -> ADD, value -> 7128770787287732271
sequenceNumber -> 113,key-> (9,-824271909935124719), partition -> 202111109,bucket -> 0,op -> ADD, value -> 2544637467764560104
sequenceNumber -> 114,key-> (0,-2652914563471254026), partition -> 202111118,bucket -> 1,op -> ADD, value -> 1480495876038550410
sequenceNumber -> 115,key-> (9,2449548613592556649), partition -> 202111109,bucket -> 2,op -> ADD, value -> -8344888900456274592
sequenceNumber -> 116,key-> (8,-4287211979177213572), partition -> 202111108,bucket -> 1,op -> ADD, value -> 4860703232778193421
sequenceNumber -> 117,key-> (5,-2279252544212225951), partition -> 202111118,bucket -> 2,op -> ADD, value -> 2246469721969240119
sequenceNumber -> 118,key-> (4,-680031043215432041), partition -> 202111118,bucket -> 0,op -> ADD, value -> -2631300262004576551
sequenceNumber -> 119,key-> (1,6131719117356614326), partition -> 202111119,bucket -> 1,op -> ADD, value -> 3625174553481934943
sequenceNumber -> 120,key-> (3,-1502604583186164397), partition -> 202111119,bucket -> 2,op -> DELETE, value -> 3846349663287424781
sequenceNumber -> 121,key-> (1,2013654839833803469), partition -> 202111119,bucket -> 2,op -> ADD, value -> -7807137138863559106
sequenceNumber -> 122,key-> (5,4613811878340843445), partition -> 202111109,bucket -> 0,op -> DELETE, value -> 7183355362201992291
sequenceNumber -> 123,key-> (4,-2809234249706038955), partition -> 202111109,bucket -> 2,op -> ADD, value -> -2773621620197680128
sequenceNumber -> 124,key-> (1,2013654839833803469), partition -> 202111119,bucket -> 2,op -> DELETE, value -> -7807137138863559106
sequenceNumber -> 125,key-> (3,2621136369545533468), partition -> 202111109,bucket -> 2,op -> ADD, value -> 3438152369562492786
sequenceNumber -> 126,key-> (0,-7493190883694484629), partition -> 202111119,bucket -> 0,op -> ADD, value -> -691195151125151453
sequenceNumber -> 127,key-> (8,-1490509224765896290), partition -> 202111108,bucket -> 2,op -> ADD, value -> -1869246074664134885
sequenceNumber -> 128,key-> (9,5511132099839824070), partition -> 202111108,bucket -> 2,op -> ADD, value -> -6181385120508855967
sequenceNumber -> 129,key-> (9,3381161680610456082), partition -> 202111118,bucket -> 2,op -> ADD, value -> -5229930391996865858
sequenceNumber -> 130,key-> (9,-47790794541920130), partition -> 202111119,bucket -> 1,op -> DELETE, value -> -227302505361441636
sequenceNumber -> 131,key-> (1,-1992061607722333830), partition -> 202111108,bucket -> 1,op -> ADD, value -> 740900030264769173
sequenceNumber -> 132,key-> (0,2752227170646411346), partition -> 202111108,bucket -> 2,op -> ADD, value -> -3727436349553309033
sequenceNumber -> 133,key-> (0,-7493190883694484629), partition -> 202111119,bucket -> 0,op -> DELETE, value -> -691195151125151453
sequenceNumber -> 134,key-> (1,2013654839833803469), partition -> 202111119,bucket -> 2,op -> ADD, value -> -7278425883508791793
sequenceNumber -> 135,key-> (1,-995426609618169301), partition -> 202111119,bucket -> 2,op -> ADD, value -> -7775193237863598706
sequenceNumber -> 136,key-> (8,-6326910797838432808), partition -> 202111109,bucket -> 0,op -> DELETE, value -> 3157902831734464520
sequenceNumber -> 137,key-> (2,911633586820358694), partition -> 202111109,bucket -> 2,op -> ADD, value -> -7827113035897188605
sequenceNumber -> 138,key-> (2,9211428132443295825), partition -> 202111108,bucket -> 1,op -> ADD, value -> 3254598811000233626
sequenceNumber -> 139,key-> (2,4167920832653664558), partition -> 202111108,bucket -> 2,op -> DELETE, value -> -1608623588433602165
sequenceNumber -> 140,key-> (7,-3849150167657665208), partition -> 202111109,bucket -> 0,op -> ADD, value -> 4825606727435135080
sequenceNumber -> 141,key-> (3,-8204171576376413410), partition -> 202111108,bucket -> 0,op -> ADD, value -> 1994264609579986073
sequenceNumber -> 142,key-> (3,-6539095021197140424), partition -> 202111108,bucket -> 1,op -> DELETE, value -> 8378366812547696285
sequenceNumber -> 143,key-> (3,-7200693063745648300), partition -> 202111109,bucket -> 0,op -> ADD, value -> -3495569691632159874
...

partition=’2021111109’, bucket=0 的 memTableCapacity 首批达到阈值 20,数据如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sequenceNumber -> 6,key-> (6,5937024096732189195), partition -> 202111109,bucket -> 0,op -> ADD, value -> 8884526624823745720
sequenceNumber -> 16,key-> (4,-6161397105244672436), partition -> 202111109,bucket -> 0,op -> ADD, value -> 7720821486888822402
sequenceNumber -> 26,key-> (1,-7142099498597121533), partition -> 202111109,bucket -> 0,op -> ADD, value -> 1917445126268645259
sequenceNumber -> 36,key-> (1,-5623847903170117779), partition -> 202111109,bucket -> 0,op -> ADD, value -> 0
sequenceNumber -> 43,key-> (0,-6226701972071386686), partition -> 202111109,bucket -> 0,op -> ADD, value -> 0
sequenceNumber -> 54,key-> (3,3608248884997787695), partition -> 202111109,bucket -> 0,op -> ADD, value -> -3940207861520786810
sequenceNumber -> 61,key-> (2,8374523283379480403), partition -> 202111109,bucket -> 0,op -> ADD, value -> 5038785059845636753
sequenceNumber -> 62,key-> (6,5937024096732189195), partition -> 202111109,bucket -> 0,op -> ADD, value -> 6587997598195970439
sequenceNumber -> 76,key-> (5,4613811878340843445), partition -> 202111109,bucket -> 0,op -> ADD, value -> 7183355362201992291
sequenceNumber -> 80,key-> (6,586933908778291566), partition -> 202111109,bucket -> 0,op -> ADD, value -> 5353448919407202023
sequenceNumber -> 91,key-> (9,-3592114153078238858), partition -> 202111109,bucket -> 0,op -> ADD, value -> -2300521514824600373
sequenceNumber -> 97,key-> (8,-6326910797838432808), partition -> 202111109,bucket -> 0,op -> ADD, value -> 3157902831734464520
sequenceNumber -> 99,key-> (7,-9073451180094251128), partition -> 202111109,bucket -> 0,op -> ADD, value -> -8421032384434482841
sequenceNumber -> 101,key-> (4,-6997582686150743744), partition -> 202111109,bucket -> 0,op -> ADD, value -> 6881154136677825108
sequenceNumber -> 105,key-> (9,-3592114153078238858), partition -> 202111109,bucket -> 0,op -> DELETE, value -> -2300521514824600373
sequenceNumber -> 113,key-> (9,-824271909935124719), partition -> 202111109,bucket -> 0,op -> ADD, value -> 2544637467764560104
sequenceNumber -> 122,key-> (5,4613811878340843445), partition -> 202111109,bucket -> 0,op -> DELETE, value -> 7183355362201992291
sequenceNumber -> 136,key-> (8,-6326910797838432808), partition -> 202111109,bucket -> 0,op -> DELETE, value -> 3157902831734464520
sequenceNumber -> 140,key-> (7,-3849150167657665208), partition -> 202111109,bucket -> 0,op -> ADD, value -> 4825606727435135080
sequenceNumber -> 143,key-> (3,-7200693063745648300), partition -> 202111109,bucket -> 0,op -> ADD, value -> -3495569691632159874

将 memTable 中的数据按 key,sequenceNumber 排序,数据如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sequenceNumber -> 43,key-> (0,-6226701972071386686), partition -> 202111109,bucket -> 0,op -> ADD, value -> 0
sequenceNumber -> 26,key-> (1,-7142099498597121533), partition -> 202111109,bucket -> 0,op -> ADD, value -> 1917445126268645259
sequenceNumber -> 36,key-> (1,-5623847903170117779), partition -> 202111109,bucket -> 0,op -> ADD, value -> 0
sequenceNumber -> 61,key-> (2,8374523283379480403), partition -> 202111109,bucket -> 0,op -> ADD, value -> 5038785059845636753
sequenceNumber -> 143,key-> (3,-7200693063745648300), partition -> 202111109,bucket -> 0,op -> ADD, value -> -3495569691632159874
sequenceNumber -> 54,key-> (3,3608248884997787695), partition -> 202111109,bucket -> 0,op -> ADD, value -> -3940207861520786810
sequenceNumber -> 101,key-> (4,-6997582686150743744), partition -> 202111109,bucket -> 0,op -> ADD, value -> 6881154136677825108
sequenceNumber -> 16,key-> (4,-6161397105244672436), partition -> 202111109,bucket -> 0,op -> ADD, value -> 7720821486888822402
sequenceNumber -> 76,key-> (5,4613811878340843445), partition -> 202111109,bucket -> 0,op -> ADD, value -> 7183355362201992291
sequenceNumber -> 122,key-> (5,4613811878340843445), partition -> 202111109,bucket -> 0,op -> DELETE, value -> 7183355362201992291
sequenceNumber -> 80,key-> (6,586933908778291566), partition -> 202111109,bucket -> 0,op -> ADD, value -> 5353448919407202023
sequenceNumber -> 6,key-> (6,5937024096732189195), partition -> 202111109,bucket -> 0,op -> ADD, value -> 8884526624823745720
sequenceNumber -> 62,key-> (6,5937024096732189195), partition -> 202111109,bucket -> 0,op -> ADD, value -> 6587997598195970439
sequenceNumber -> 99,key-> (7,-9073451180094251128), partition -> 202111109,bucket -> 0,op -> ADD, value -> -8421032384434482841
sequenceNumber -> 140,key-> (7,-3849150167657665208), partition -> 202111109,bucket -> 0,op -> ADD, value -> 4825606727435135080
sequenceNumber -> 97,key-> (8,-6326910797838432808), partition -> 202111109,bucket -> 0,op -> ADD, value -> 3157902831734464520
sequenceNumber -> 136,key-> (8,-6326910797838432808), partition -> 202111109,bucket -> 0,op -> DELETE, value -> 3157902831734464520
sequenceNumber -> 113,key-> (9,-824271909935124719), partition -> 202111109,bucket -> 0,op -> ADD, value -> 2544637467764560104
sequenceNumber -> 91,key-> (9,-3592114153078238858), partition -> 202111109,bucket -> 0,op -> ADD, value -> -2300521514824600373
sequenceNumber -> 105,key-> (9,-3592114153078238858), partition -> 202111109,bucket -> 0,op -> DELETE, value -> -2300521514824600373

再按 key 聚合去重,数据如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sequenceNumber -> 43,key-> (0,-6226701972071386686), partition -> 202111109,bucket -> 0,op -> ADD, value -> 0
sequenceNumber -> 26,key-> (1,-7142099498597121533), partition -> 202111109,bucket -> 0,op -> ADD, value -> 1917445126268645259
sequenceNumber -> 36,key-> (1,-5623847903170117779), partition -> 202111109,bucket -> 0,op -> ADD, value -> 0
sequenceNumber -> 61,key-> (2,8374523283379480403), partition -> 202111109,bucket -> 0,op -> ADD, value -> 5038785059845636753
sequenceNumber -> 143,key-> (3,-7200693063745648300), partition -> 202111109,bucket -> 0,op -> ADD, value -> -3495569691632159874
sequenceNumber -> 54,key-> (3,3608248884997787695), partition -> 202111109,bucket -> 0,op -> ADD, value -> -3940207861520786810
sequenceNumber -> 101,key-> (4,-6997582686150743744), partition -> 202111109,bucket -> 0,op -> ADD, value -> 6881154136677825108
sequenceNumber -> 16,key-> (4,-6161397105244672436), partition -> 202111109,bucket -> 0,op -> ADD, value -> 7720821486888822402
sequenceNumber -> 122,key-> (5,4613811878340843445), partition -> 202111109,bucket -> 0,op -> DELETE, value -> 7183355362201992291
sequenceNumber -> 80,key-> (6,586933908778291566), partition -> 202111109,bucket -> 0,op -> ADD, value -> 5353448919407202023
sequenceNumber -> 62,key-> (6,5937024096732189195), partition -> 202111109,bucket -> 0,op -> ADD, value -> 6587997598195970439
sequenceNumber -> 99,key-> (7,-9073451180094251128), partition -> 202111109,bucket -> 0,op -> ADD, value -> -8421032384434482841
sequenceNumber -> 140,key-> (7,-3849150167657665208), partition -> 202111109,bucket -> 0,op -> ADD, value -> 4825606727435135080
sequenceNumber -> 136,key-> (8,-6326910797838432808), partition -> 202111109,bucket -> 0,op -> DELETE, value -> 3157902831734464520
sequenceNumber -> 113,key-> (9,-824271909935124719), partition -> 202111109,bucket -> 0,op -> ADD, value -> 2544637467764560104
sequenceNumber -> 105,key-> (9,-3592114153078238858), partition -> 202111109,bucket -> 0,op -> DELETE, value -> -2300521514824600373

ManifestFileTest

ManifestFileTest#testWriteAndReadManifestFilegit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/** Tests for {@link ManifestFile}. */
public class ManifestFileTest {
private final ManifestTestDataGenerator gen = ManifestTestDataGenerator.builder().build();

@RepeatedTest(10)
public void testWriteAndReadManifestFile() {

// 生成一批数据,并拼装返回 List<ManifestEntry>
List<ManifestEntry> entries = generateData();

// 生成 ManifestFileMeta 对象,用于与真实写入 manifest file 返回的 ManifestFileMeta 做比较
ManifestFileMeta meta = gen.createManifestFileMeta(entries);

// 创建 ManifestFile ,用于读写 manifest file
ManifestFile manifestFile = createManifestFile(tempDir.toString());
// ManifestFile.write() -> ManifestRollingFile.write() -> RollingFile.write() ->
// ManifestRollingFile.collectFile(Path), 返回 ManifestFileMeta
List<ManifestFileMeta> actualMetas = manifestFile.write(entries);
checkRollingFiles(meta, actualMetas, manifestFile.suggestedFileSize());

List<ManifestEntry> actualEntries =
actualMetas.stream()
// ManifestFile.read() -> FileUtils.readListFromFile() 构建 SourceSplit -> BulkFormat.Reader
.flatMap(m -> manifestFile.read(m.fileName()).stream())
.collect(Collectors.toList());
assertThat(actualEntries).isEqualTo(entries);
}

private List<ManifestEntry> generateData() {

List<ManifestEntry> entries = new ArrayList<>();
for (int i = 0; i < 30; i++) {
ManifestEntry manifestEntry = gen.next();

System.out.println(String.format("manifestEntry: valueKind -> %s,partition -> %s,bucket -> %s,level -> %s," +
"sst file name -> %s",
manifestEntry.kind().toString(),
manifestEntry.partition().getString(0) + "" + manifestEntry.partition().getInt(1),
manifestEntry.bucket(),
manifestEntry.bucket(),
manifestEntry.file().fileName()));
System.out.println();

entries.add(manifestEntry);

}
return entries;
}
}
ManifestTestDataGenerator.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/** Random {@link ManifestEntry} generator. */
public class ManifestTestDataGenerator {

private static final int LEVEL_CAPACITY = 3;

private final List<Map<BinaryRowData, List<List<SstTestDataGenerator.Data>>>> levels;
private final SstTestDataGenerator gen;

private final LinkedList<ManifestEntry> bufferedResults;

public ManifestEntry next() {
if (bufferedResults.size() > 0) {
return bufferedResults.poll();
}

// 生成内存对象 sst file Data
SstTestDataGenerator.Data file = gen.next();

System.out.println(String.format("sst file: partition -> %s,bucket -> %s,level -> %s," +
"sst file name -> %s",
file.partition.getString(0) + "" + file.partition.getInt(1),
file.bucket, file.bucket, file.meta.fileName()));

// levels List<Map<BinaryRowData, List<List<SstTestDataGenerator.Data>>>>,每个元素代表一个 level
// Map<BinaryRowData, List<List<SstTestDataGenerator.Data>>> ,<partition,List<List<SstTestDataGenerator.Data>>>
// List<List<SstTestDataGenerator.Data>> ,每一个元素代表一个 bucket
// List<SstTestDataGenerator.Data> ,代表 bucket 中的 lsm data files
List<List<SstTestDataGenerator.Data>> bucketLevels =
levels.get(file.bucket).computeIfAbsent(file.partition, k -> new ArrayList<>());
ensureCapacity(bucketLevels, file.meta.level());

List<SstTestDataGenerator.Data> level = bucketLevels.get(file.meta.level());
// 添加到 bucket 下的 lsm data file list 中
level.add(file);

bufferedResults.push(
new ManifestEntry(
ValueKind.ADD, file.partition, file.bucket, numBuckets, file.meta));
// merge level
mergeLevelsIfNeeded(file.partition, file.bucket);

return bufferedResults.poll();
}

/**
* lsm tree level 文件合并
*/
private void mergeLevelsIfNeeded(BinaryRowData partition, int bucket) {

// this method uses a very simple merging strategy just for producing valid data
List<List<SstTestDataGenerator.Data>> bucketLevels = levels.get(bucket).get(partition);
int lastModifiedLevel = 0;
// level0 通常会限制 sst file 个数为4
while (bucketLevels.get(lastModifiedLevel).size() > LEVEL_CAPACITY) {

// remove all sst files in the current and next level
ensureCapacity(bucketLevels, lastModifiedLevel + 1);
List<SstTestDataGenerator.Data> currentLevel = bucketLevels.get(lastModifiedLevel);
List<SstTestDataGenerator.Data> nextLevel = bucketLevels.get(lastModifiedLevel + 1);
List<KeyValue> kvs = new ArrayList<>();

for (SstTestDataGenerator.Data file : currentLevel) {
// ManifestEntry 的 ValueKind 为 DELETE,合并之后将被删除
bufferedResults.push(
new ManifestEntry(
ValueKind.DELETE, partition, bucket, numBuckets, file.meta));
// 将当前 level 下的 sst files 加入 kvs
kvs.addAll(file.content);
}
currentLevel.clear();

for (SstTestDataGenerator.Data file : nextLevel) {
// ManifestEntry 的 ValueKind 为 DELETE,合并之后将被删除
bufferedResults.push(
new ManifestEntry(
ValueKind.DELETE, partition, bucket, numBuckets, file.meta));
// 将 next level 下的 sst files 加入 kvs
kvs.addAll(file.content);
}
nextLevel.clear();

// add back merged sst files
// 在 next level 中,为 currentLevel 和 nextLevel 合并好的有序 LinkedList<KeyValues> 创建 sst file
// 根据 key,sequenceNumber 进行排序,按 key 聚合去重
List<SstTestDataGenerator.Data> merged =
gen.createSstFiles(kvs, lastModifiedLevel + 1, partition, bucket);
// 新创建的 sst files 加入到 next level
nextLevel.addAll(merged);

for (SstTestDataGenerator.Data file : nextLevel) {
// 遍历 next level 中的 sst files ,生成一个新的 List<ManifestEntry>
bufferedResults.push(
new ManifestEntry(ValueKind.ADD, partition, bucket, numBuckets, file.meta));
}

lastModifiedLevel += 1;
}
}

public ManifestFileMeta createManifestFileMeta(List<ManifestEntry> entries) {
Preconditions.checkArgument(
!entries.isEmpty(), "Manifest entries are empty. Invalid test data.");

FieldStatsCollector collector =
new FieldStatsCollector(TestKeyValueGenerator.PARTITION_TYPE);

long numAddedFiles = 0;
long numDeletedFiles = 0;
for (ManifestEntry entry : entries) {
collector.collect(entry.partition());
if (entry.kind() == ValueKind.ADD) {
numAddedFiles++;
} else {
numDeletedFiles++;
}
}

return new ManifestFileMeta(
"manifest-" + UUID.randomUUID(),
entries.size() * 100L,
numAddedFiles,
numDeletedFiles,
collector.extract());
}

private ManifestFile createManifestFile(String path) {
FileStorePathFactory pathFactory =
new FileStorePathFactory(
new Path(path), TestKeyValueGenerator.PARTITION_TYPE, "default");
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
return new ManifestFile.Factory(
TestKeyValueGenerator.PARTITION_TYPE,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.ROW_TYPE,
avro,
pathFactory,
suggestedFileSize)
.create();
}

}

sst files:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sst file: partition -> 202111119,bucket -> 1,level -> 1,sst file name -> sst-40dab8b8-a7c0-4707-9743-b3a91c2d12f7
sst file: partition -> 202111108,bucket -> 1,level -> 1,sst file name -> sst-1e8deb99-9685-48e9-99c2-162db58b68b2
sst file: partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-c572853c-3ceb-4df3-b002-4dfdc6d15e6a
sst file: partition -> 202111118,bucket -> 1,level -> 1,sst file name -> sst-4d10b7c1-d152-46d4-b9ca-4516e8c44b90
sst file: partition -> 202111109,bucket -> 0,level -> 0,sst file name -> sst-25d3a4e5-b452-488d-889d-59a152d610a2
sst file: partition -> 202111109,bucket -> 0,level -> 0,sst file name -> sst-70ff1288-6e62-473e-aab1-3950e87118e8
sst file: partition -> 202111119,bucket -> 1,level -> 1,sst file name -> sst-23feaaf4-5e2a-42b3-95b9-c2383c6125f7
sst file: partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-833c7b50-b39f-4a1b-9ee6-7fab8011499d
sst file: partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-686f60a7-29e8-4d67-81bd-07cc47fc4db8
sst file: partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-cf014187-3a62-48ca-bac3-26e9d899cc48 // lsm files > 3
sst file: partition -> 202111109,bucket -> 2,level -> 2,sst file name -> sst-0dc375a6-4819-4f2d-ab9e-8db87f9567f8
sst file: partition -> 202111108,bucket -> 1,level -> 1,sst file name -> sst-8c7b1c56-91f1-40bc-aeda-d00b181c919e
sst file: partition -> 202111108,bucket -> 2,level -> 2,sst file name -> sst-242a0b57-a2b0-4008-bb94-489d30c81f74
sst file: partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-15609d8a-de66-4628-b8c9-bee68ba7c9ae
sst file: partition -> 202111109,bucket -> 0,level -> 0,sst file name -> sst-50b1ca0f-39bf-4c93-b2bd-37633682cbab
sst file: partition -> 202111119,bucket -> 2,level -> 2,sst file name -> sst-60666da9-27ef-4169-871b-34426c78fb30
sst file: partition -> 202111108,bucket -> 1,level -> 1,sst file name -> sst-fa182845-ed32-461c-b5f4-911fd625c9e6
sst file: partition -> 202111109,bucket -> 0,level -> 0,sst file name -> sst-ad729042-e2a6-4c36-97d3-ab62ec1fe30f // lsm files > 3
sst file: partition -> 202111109,bucket -> 2,level -> 2,sst file name -> sst-90b8d42a-5fd7-4a23-9af4-0c449b543a57
sst file: partition -> 202111118,bucket -> 2,level -> 2,sst file name -> sst-83fdee55-99f8-4306-a24c-6a67f2cd7c8a

manifestEntry:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
manifestEntry: valueKind -> ADD,partition -> 202111119,bucket -> 1,level -> 1,sst file name -> sst-40dab8b8-a7c0-4707-9743-b3a91c2d12f7
manifestEntry: valueKind -> ADD,partition -> 202111108,bucket -> 1,level -> 1,sst file name -> sst-1e8deb99-9685-48e9-99c2-162db58b68b2
manifestEntry: valueKind -> ADD,partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-c572853c-3ceb-4df3-b002-4dfdc6d15e6a
manifestEntry: valueKind -> ADD,partition -> 202111118,bucket -> 1,level -> 1,sst file name -> sst-4d10b7c1-d152-46d4-b9ca-4516e8c44b90
manifestEntry: valueKind -> ADD,partition -> 202111109,bucket -> 0,level -> 0,sst file name -> sst-25d3a4e5-b452-488d-889d-59a152d610a2
manifestEntry: valueKind -> ADD,partition -> 202111109,bucket -> 0,level -> 0,sst file name -> sst-70ff1288-6e62-473e-aab1-3950e87118e8
manifestEntry: valueKind -> ADD,partition -> 202111119,bucket -> 1,level -> 1,sst file name -> sst-23feaaf4-5e2a-42b3-95b9-c2383c6125f7
manifestEntry: valueKind -> ADD,partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-833c7b50-b39f-4a1b-9ee6-7fab8011499d
manifestEntry: valueKind -> ADD,partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-686f60a7-29e8-4d67-81bd-07cc47fc4db8

// 1. 生成 sst-cf014187-3a62-48ca-bac3-26e9d899cc48 之后,partition -> 202111109,bucket -> 1,level -> 1 的 lsm files > 3 ,压栈
// 2. 依次标记删除当前 level 层的 lsm file
// 3. merge 生成一个新的 lsm file sst-b8c363dd-d918-4425-ae31-449d76d3e4a3
// 经过以上3步,出栈的顺序如下:
manifestEntry: valueKind -> ADD,partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-b8c363dd-d918-4425-ae31-449d76d3e4a3
manifestEntry: valueKind -> DELETE,partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-cf014187-3a62-48ca-bac3-26e9d899cc48
manifestEntry: valueKind -> DELETE,partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-686f60a7-29e8-4d67-81bd-07cc47fc4db8
manifestEntry: valueKind -> DELETE,partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-833c7b50-b39f-4a1b-9ee6-7fab8011499d
manifestEntry: valueKind -> DELETE,partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-c572853c-3ceb-4df3-b002-4dfdc6d15e6a
manifestEntry: valueKind -> ADD,partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-cf014187-3a62-48ca-bac3-26e9d899cc48

manifestEntry: valueKind -> ADD,partition -> 202111109,bucket -> 2,level -> 2,sst file name -> sst-0dc375a6-4819-4f2d-ab9e-8db87f9567f8
manifestEntry: valueKind -> ADD,partition -> 202111108,bucket -> 1,level -> 1,sst file name -> sst-8c7b1c56-91f1-40bc-aeda-d00b181c919e
manifestEntry: valueKind -> ADD,partition -> 202111108,bucket -> 2,level -> 2,sst file name -> sst-242a0b57-a2b0-4008-bb94-489d30c81f74
manifestEntry: valueKind -> ADD,partition -> 202111109,bucket -> 1,level -> 1,sst file name -> sst-15609d8a-de66-4628-b8c9-bee68ba7c9ae
manifestEntry: valueKind -> ADD,partition -> 202111109,bucket -> 0,level -> 0,sst file name -> sst-50b1ca0f-39bf-4c93-b2bd-37633682cbab
manifestEntry: valueKind -> ADD,partition -> 202111119,bucket -> 2,level -> 2,sst file name -> sst-60666da9-27ef-4169-871b-34426c78fb30
manifestEntry: valueKind -> ADD,partition -> 202111108,bucket -> 1,level -> 1,sst file name -> sst-fa182845-ed32-461c-b5f4-911fd625c9e6

manifestEntry: valueKind -> ADD,partition -> 202111109,bucket -> 0,level -> 0,sst file name -> sst-592e5b47-844b-4c49-8ffe-8e183560dbe5
manifestEntry: valueKind -> DELETE,partition -> 202111109,bucket -> 0,level -> 0,sst file name -> sst-ad729042-e2a6-4c36-97d3-ab62ec1fe30f
manifestEntry: valueKind -> DELETE,partition -> 202111109,bucket -> 0,level -> 0,sst file name -> sst-50b1ca0f-39bf-4c93-b2bd-37633682cbab
manifestEntry: valueKind -> DELETE,partition -> 202111109,bucket -> 0,level -> 0,sst file name -> sst-70ff1288-6e62-473e-aab1-3950e87118e8
manifestEntry: valueKind -> DELETE,partition -> 202111109,bucket -> 0,level -> 0,sst file name -> sst-25d3a4e5-b452-488d-889d-59a152d610a2
manifestEntry: valueKind -> ADD,partition -> 202111109,bucket -> 0,level -> 0,sst file name -> sst-ad729042-e2a6-4c36-97d3-ab62ec1fe30f

manifestEntry: valueKind -> ADD,partition -> 202111109,bucket -> 2,level -> 2,sst file name -> sst-90b8d42a-5fd7-4a23-9af4-0c449b543a57
manifestEntry: valueKind -> ADD,partition -> 202111118,bucket -> 2,level -> 2,sst file name -> sst-83fdee55-99f8-4306-a24c-6a67f2cd7c8a

ManifestListTest

ManifestListTest#testWriteAndReadManifestListgit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/** Tests for {@link ManifestList}. */
public class ManifestListTest {
private final ManifestTestDataGenerator gen = ManifestTestDataGenerator.builder().build();

@RepeatedTest(1)
public void testWriteAndReadManifestList() {

// 生成 kv 数据 -> 组装 ManifestEntry 生成 ManifestFileMeta -> 返回 ManifestFileMeta list
List<ManifestFileMeta> metas = generateData();

ManifestList manifestList = createManifestList(tempDir.toString());

// 依次序列化写入 ManifestFileMeta
String manifestListName = manifestList.write(metas);

List<ManifestFileMeta> actualMetas = manifestList.read(manifestListName);
assertThat(actualMetas).isEqualTo(metas);
}

private List<ManifestFileMeta> generateData() {
Random random = new Random();
List<ManifestFileMeta> metas = new ArrayList<>();
for (int i = 0; i < 10; i++) {
List<ManifestEntry> entries = new ArrayList<>();
for (int j = random.nextInt(10) + 1; j > 0; j--) {
entries.add(gen.next());
}
metas.add(gen.createManifestFileMeta(entries));
}
return metas;
}

private ManifestList createManifestList(String path) {
FileStorePathFactory pathFactory =
new FileStorePathFactory(
new Path(path), TestKeyValueGenerator.PARTITION_TYPE, "default");
return new ManifestList.Factory(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory)
.create();
}

}

IntervalPartitionTest

IntervalPartitionTest.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/** Tests for {@link IntervalPartition}. */
public class IntervalPartitionTest {

@Test
public void testSameMinKey() {
runTest(
// 输入
"[100, 200], [100, 400], [100, 300], [100, 500]",
// 输出
"[100, 200] | [100, 300] | [100, 400] | [100, 500]");
}

@Test
public void testSameMaxKey() {
runTest(
// 输入
"[100, 500], [300, 500], [200, 500], [400, 500]",
// 输出
"[100, 500] | [200, 500] | [300, 500] | [400, 500]");
}

@Test
public void testSectionPartitioning() {
// 0 5 10 15 20 25 30
// |--------|
// |-|
// |-----|
// |-----|
// |-----------|
// |-------|
// 0 5 10 15 20 25 30
runTest(
// 输入
"[0, 9], [5, 7], [9, 15], [16, 22], [16, 28], [24, 32]",
// 输出
"[0, 9] | [5, 7], [9, 15]\n[16, 22], [24, 32] | [16, 28]");

/*
* p=p0/bucket-0/[0,9]
* /bucket-1/[5-7] [9-15]
*
* p=p1/bucket-0/[16,22] [24,32]
* /bucket-1/[16,28]
*
*/

}
}
IntervalPartition.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/** Algorithm to partition several sst files into the minimum number of {@link SortedRun}s. */
public class IntervalPartition {

public IntervalPartition(List<SstFileMeta> inputFiles, Comparator<RowData> keyComparator) {
this.files = new ArrayList<>(inputFiles);
// 输入的 sst file metas 依次按照 minKey 、maxKey 排序
this.files.sort(
(o1, o2) -> {
int leftResult = keyComparator.compare(o1.minKey(), o2.minKey());
return leftResult == 0
? keyComparator.compare(o1.maxKey(), o2.maxKey())
: leftResult;
});
this.keyComparator = keyComparator;
}

public List<List<SortedRun>> partition() {
List<List<SortedRun>> result = new ArrayList<>();
List<SstFileMeta> section = new ArrayList<>();
BinaryRowData bound = null;

for (SstFileMeta meta : files) {
if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {
// larger than current right bound, conclude current section and create a new one
// [16,22] 的 mingKey 大于 bound,即 [9,15] ,将之前输入的 [0,9]、[5,7]、[9,15] 聚合在一个 partition 下
result.add(partition(section));
section.clear();
bound = null;
}
section.add(meta);
if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {
// update right bound
bound = meta.maxKey();
}
}
if (!section.isEmpty()) {
// conclude last section
result.add(partition(section));
}

return result;
}

private List<SortedRun> partition(List<SstFileMeta> metas) {

// 优先级队列,maxKey 小的排在队列前面
PriorityQueue<List<SstFileMeta>> queue =
new PriorityQueue<>(
(o1, o2) ->
// sort by max key of the last sst file
keyComparator.compare(
o1.get(o1.size() - 1).maxKey(),
o2.get(o2.size() - 1).maxKey()));

// [0,9]、[5,7]、[9,15]
// create the initial partition
List<SstFileMeta> firstRun = new ArrayList<>();
firstRun.add(metas.get(0));
queue.add(firstRun);

for (int i = 1; i < metas.size(); i++) {
SstFileMeta meta = metas.get(i);
// any file list whose max key < meta.minKey() is sufficient,
// for convenience we pick the smallest
List<SstFileMeta> top = queue.poll();
if (keyComparator.compare(meta.minKey(), top.get(top.size() - 1).maxKey()) > 0) {
// append current file to an existing partition
// [9,15] 的 minKey = 9 > [5,7] 的 maxKey = 7
top.add(meta);
} else {
// [5,7] 的 minKey = 7 < [0,9] 的 maxKey
// create a new partition ,我理解为一个新的 bucket
List<SstFileMeta> newRun = new ArrayList<>();
newRun.add(meta);
queue.add(newRun);
}
queue.add(top);
}

// order between partitions does not matter
return queue.stream().map(SortedRun::fromSorted).collect(Collectors.toList());
}


}

UniversalCompactionTest

  • testSizeAmplification
UniversalCompactionTest#testSizeAmplificationgit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/** Test for {@link UniversalCompaction}. */
public class UniversalCompactionTest {

@Test
public void testSizeAmplification() {
UniversalCompaction compaction = new UniversalCompaction(25, 0, 1);
long[] sizes = new long[] {1};
// 满足条件的,CompactionUnit != null,返回 files 的 totalSize 之和

// 输入 {1,1} 1*100 > 25*1 ,CompactionUnit != null 返回 {1,1} -> {2}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {2});

// 输入 {1,2} 1*100 > 25*2 ,CompactionUnit != null 返回 {1,2} -> {3}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {3});

// 输入 {1,3} 1*100 > 25*3 ,CompactionUnit != null 返回 {1,3} -> {4}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {4});

// 输入 {1,4} 1*100 = 25*4 ,CompactionUnit == null , {1,4}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 4});

// 输入 {1,1,4} 2*100 > 25*4 ,CompactionUnit != null 返回 {1,1,4} -> {6}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {6});

// 输入 {1,6} 1*100 < 25*6 ,CompactionUnit == null , {1,6}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 6});

// 输入 {1,1,6} 2*100 > 25*6 ,CompactionUnit != null 返回 {1,1,6} -> {8}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {8});

// 输入 {1,8} 1*100 < 25*8 ,CompactionUnit == null , {1,8}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 8});

// 输入 {1,1,8} 2*100 = 25*8 ,CompactionUnit == null , {1,1,8}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 1, 8});

// 输入 {1,1,1,8} 3*100 > 25*8 ,CompactionUnit != null 返回 {1,1,1,8} -> {11}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {11});

// 输入 {1,11} 1*100 < 25*11 ,CompactionUnit == null , {1,11}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 11});

// 输入 {1,1,11} 2*100 < 25*11 ,CompactionUnit == null , {1,1,11}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 1, 11});

// 输入 {1,1,1,11} 3*100 > 25*11 ,CompactionUnit != null 返回 {1,1,1,11} -> {14}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {14});

// 输入 {1,14} 1*100 < 25*14 ,CompactionUnit == null , {1,14}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 14});

// 输入 {1,1,14} 2*100 < 25*14 ,CompactionUnit != null , {1,1,14}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 1, 14});

// 输入 {1,1,1,14} 3*100 < 25*14 ,CompactionUnit == null , {1,1,1,14}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 1, 1, 14});

// 输入 {1,1,1,1,14} 4*100 > 25*14 ,CompactionUnit != null 返回 {1,1,1,1,14} -> {18}
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {18});
}

}
  • testSizeRatio
UniversalCompactionTest#testSizeRatiogit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/** Test for {@link UniversalCompaction}. */
public class UniversalCompactionTest {

@Test
public void testSizeRatio() {
UniversalCompaction compaction = new UniversalCompaction(25, 1, 5);
long[] sizes = new long[] {1, 1, 1, 1};
// 满足条件的,CompactionUnit != null,返回 files 的 totalSize 之和

// {1,1,1,1,1}
// 1*(100+1)/100 > 1
// 2*(100+1)/100 > 1
// 3*(100+1)/100 > 1
// 4*(100+1)/100 > 1
// CompactionUnit !=null 返回 {1,1,1,1,1} -> {5}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {5});

// {1,5}
// runs.size = 2 < maxRunNum = 5
// CompactionUnit == null , {1,5}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 5});

// {1,1,5}
// runs.size = 3 < maxRunNum = 5
// CompactionUnit == null ,{1,1,5}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 1, 5});

// {1,1,1,5}
// runs.size = 4 < maxRunNum = 5
// CompactionUnit == null ,{1,1,1,5}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 1, 1, 5});

// {1,1,1,1,5}
// 1*(100+1)/100 > 1
// 2*(100+1)/100 > 1
// 3*(100+1)/100 > 1
// 4*(100+1)/100 < 5 break
// CompactionUnit != null 返回 {1,1,1,1} -> {4} ,{4,5}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {4, 5});

// {1,4,5}
// runs.size = 3 < maxRunNum = 5
// CompactionUnit == null ,{1,4,5}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 4, 5});

// {1,1,4,5}
// runs.size = 4 < maxRunNum = 5
// CompactionUnit == null ,{1,1,4,5}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 1, 4, 5});

// {1,1,1,4,5}
// 1*(100+1)/100 > 1
// 2*(100+1)/100 > 1
// 3*(100+1)/100 < 1 break
// CompactionUnit != null 返回 {1,1,1} -> {3} ,{3,4,5}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {3, 4, 5});

// {1,3,4,5}
// runs.size = 4 < maxRunNum = 5
// CompactionUnit == null ,{1,3,4,5}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 3, 4, 5});

// {1,1,3,4,5}
// 1*(100+1)/100 > 1
// 2*(100+1)/100 < 1 break
// CompactionUnit != null 返回 {1,1} -> {2} ,{2,3,4,5}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {2, 3, 4, 5});

// {1,2,3,4,5}
// 1*(100+1)/100 < 2 break
// CompactionUnit == null ,{1,2,3,4,5}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 2, 3, 4, 5});

// {1,1,2,3,4,5}
// 1*(100+1)/100 > 1
// 2*(100+1)/100 > 2
// 3*(100+1)/100 > 3
// 4*(100+1)/100 > 4
// 5*(100+1)/100 > 5
// CompactionUnit != null 返回 {1,1,2,3,4,5} -> {16}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {16});

// {1,16}
// runs.size = 2 < maxRunNum = 5
// CompactionUnit == null ,{1,16}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 16});

// {1,1,16}
// runs.size = 3 < maxRunNum = 5
// CompactionUnit == null ,{1,1,16}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 1, 16});

// {1,1,1,16}
// runs.size = 4 < maxRunNum = 5
// CompactionUnit == null ,{1,1,1,16}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 1, 1, 16});

// {1,1,1,1,16}
// 1*(100+1)/100 > 1
// 2*(100+1)/100 > 2
// 3*(100+1)/100 > 3
// 4*(100+1)/100 < 16
// CompactionUnit != null 返回 {1,1,1,1} -> {4} ,{4,16}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {4, 16});

// {1,4,16}
// runs.size = 3 < maxRunNum = 5
// CompactionUnit == null ,{1,4,16}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 4, 16});

// {1,1,4,16}
// runs.size = 4 < maxRunNum = 5
// CompactionUnit == null ,{1,1,4,16}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 1, 4, 16});

// {1,1,1,4,16}
// 1*(100+1)/100 > 1
// 2*(100+1)/100 > 1
// 3*(100+1)/100 < 4
// CompactionUnit != null 返回 {1,1,1} -> {3} ,{3,4,16}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {3, 4, 16});

// {1,3,4,16}
// runs.size = 4 < maxRunNum = 5
// CompactionUnit == null ,{1,3,4,16}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 3, 4, 16});

// {1,1,3,4,16}
// 1*(100+1)/100 > 1
// 2*(100+1)/100 < 3 break
// CompactionUnit != null 返回 {1,1} -> {2} ,{2,3,4,16}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {2, 3, 4, 16});

// {1,2,3,4,16}
// 1*(100+1)/100 < 2 break
// CompactionUnit == null ,{1,2,3,4,16}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {1, 2, 3, 4, 16});

// {1,1,2,3,4,16}
// 1*(100+1)/100 > 1
// 2*(100+1)/100 > 2
// 3*(100+1)/100 > 3
// 4*(100+1)/100 > 4
// 5*(100+1)/100 < 16 break
// CompactionUnit != null 返回 {1,1,2,3,4} -> {11} ,{11,16}
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {11, 16});
}
}

MergeTreeTest

MergeTreeTest#testWriteAndReadgit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
/** Tests for {@link MergeTreeReader} and {@link MergeTreeWriter}. */
public class MergeTreeTest {

@TempDir java.nio.file.Path tempDir;
private static ExecutorService service;
private FileStorePathFactory pathFactory;
private Comparator<RowData> comparator;

private MergeTreeOptions options;
private SstFileReader sstFileReader;
private SstFileWriter sstFileWriter;
private RecordWriter writer;

@BeforeAll
public static void before() {
service = Executors.newSingleThreadExecutor();
}

@AfterAll
public static void after() {
service.shutdownNow();
service = null;
}

@BeforeEach
public void beforeEach() throws IOException {
pathFactory = new FileStorePathFactory(new Path(tempDir.toString()));
comparator = Comparator.comparingInt(o -> o.getInt(0));

// 创建 MergeTree
recreateMergeTree(1024 * 1024);

// 创建 bucket dir
Path bucketDir = sstFileWriter.pathFactory().toPath("ignore").getParent();
bucketDir.getFileSystem().mkdirs(bucketDir);
}

private void recreateMergeTree(long targetFileSize) {
Configuration configuration = new Configuration();
configuration.set(MergeTreeOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
configuration.set(MergeTreeOptions.PAGE_SIZE, new MemorySize(4096));
configuration.set(MergeTreeOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize));
options = new MergeTreeOptions(configuration);
RowType keyType = new RowType(singletonList(new RowType.RowField("k", new IntType())));
RowType valueType = new RowType(singletonList(new RowType.RowField("v", new IntType())));
FileFormat flushingAvro = new FlushingFileFormat("avro");

sstFileReader =
new SstFileReader.Factory(keyType, valueType, flushingAvro, pathFactory)
.create(BinaryRowDataUtil.EMPTY_ROW, 0);
sstFileWriter =
new SstFileWriter.Factory(
keyType,
valueType,
flushingAvro,
pathFactory,
options.targetFileSize)
.create(BinaryRowDataUtil.EMPTY_ROW, 0);
writer = createMergeTreeWriter(Collections.emptyList());
}

private MergeTreeWriter createMergeTreeWriter(List<SstFileMeta> files) {
// 初始为 -1
long maxSequenceNumber =
files.stream().map(SstFileMeta::maxSequenceNumber).max(Long::compare).orElse(-1L);
return new MergeTreeWriter(
new SortBufferMemTable(
sstFileWriter.keyType(),
sstFileWriter.valueType(),
options.writeBufferSize,
options.pageSize),
createCompactManager(sstFileWriter, service),
// stores all level files of merge tree
new Levels(comparator, files, options.numLevels),
maxSequenceNumber,
comparator,
// where key is primary key (unique) and value is the full record, only keep the latest one.
new DeduplicateAccumulator(),
sstFileWriter,
// 提交之后,强制执行 compact
options.commitForceCompact);
}

private CompactManager createCompactManager(
SstFileWriter sstFileWriter, ExecutorService compactExecutor) {
// 构建压缩策略
CompactStrategy compactStrategy =
new UniversalCompaction(
options.maxSizeAmplificationPercent,
options.sizeRatio,
options.numSortedRunMax);
CompactManager.Rewriter rewriter =
(outputLevel, dropDelete, sections) ->
sstFileWriter.write(
new RecordReaderIterator(
new MergeTreeReader(
sections,
dropDelete,
sstFileReader,
comparator,
new DeduplicateAccumulator())),
outputLevel);
return new CompactManager(
compactExecutor, compactStrategy, comparator, options.targetFileSize, rewriter);
}



@Test
public void testWriteAndRead() throws Exception {

// 生成一批 200 条数据
int batchNumber = 1;
int perBatch = 200;

List<TestRecord> expected = new ArrayList<>();
List<SstFileMeta> newFiles = new ArrayList<>();
Set<String> newFileNames = new HashSet<>();
List<SstFileMeta> compactedFiles = new ArrayList<>();

// ================================ 测试写入 ================================
// write batch and commit
for (int i = 0; i <= batchNumber; i++) {
if (i < batchNumber) {
expected.addAll(writeBatch(perBatch));
} else {
writer.sync();
}

// 所有数据都写入之后,接着执行 prepareCommit ,内部会再次执行 flush() ,返回增量 new files
Increment increment = writer.prepareCommit();
newFiles.addAll(increment.newFiles());

// 进行合并,用于断言
mergeCompacted(newFileNames, compactedFiles, increment);
}


// ================================ 测试读取 ================================
// assert records from writer
assertRecords(expected);

// assert records from increment new files
assertRecords(expected, newFiles, false);
assertRecords(expected, newFiles, true);

// assert records from increment compacted files
assertRecords(expected, compactedFiles, true);

// 清空目录下创建的一些文件
Path bucketDir = sstFileWriter.pathFactory().toPath("ignore").getParent();
Set<String> files =
Arrays.stream(bucketDir.getFileSystem().listStatus(bucketDir))
.map(FileStatus::getPath)
.map(Path::getName)
.collect(Collectors.toSet());
newFiles.stream().map(SstFileMeta::fileName).forEach(files::remove);
compactedFiles.stream().map(SstFileMeta::fileName).forEach(files::remove);
assertThat(files).isEqualTo(Collections.emptySet());
}

private List<TestRecord> writeBatch(int perBatch) throws Exception {
// 随机生成一批数据
List<TestRecord> records = generateRandom(perBatch);
writeAll(records);
return records;
}

private List<TestRecord> generateRandom(int perBatch) {
Random random = new Random();
List<TestRecord> records = new ArrayList<>(perBatch);
for (int i = 0; i < perBatch; i++) {
records.add(
new TestRecord(
random.nextBoolean() ? ValueKind.ADD : ValueKind.DELETE,
random.nextInt(perBatch / 2),
random.nextInt()));
}
return records;
}

/**
* 执行 MergeTree.write() 写入
*/
private void writeAll(List<TestRecord> records) throws Exception {
for (TestRecord record : records) {
// 通过 MergeTreeWriter 逐个写入
writer.write(record.kind, row(record.k), row(record.v));
}
}

/**
* 读取数据,验证写入是否正确
*/
private void assertRecords(
List<TestRecord> expected, List<SstFileMeta> files, boolean dropDelete)
throws Exception {
assertThat(readAll(files, dropDelete)).isEqualTo(compactAndSort(expected, dropDelete));
}

/**
* 读取 List<SstFileMeta> files ,转换成 List<TestRecord>
*/
private List<TestRecord> readAll(List<SstFileMeta> files, boolean dropDelete) throws Exception {

RecordReader reader =
new MergeTreeReader(
new IntervalPartition(files, comparator).partition(),
dropDelete,
sstFileReader,
comparator,
new DeduplicateAccumulator());

List<TestRecord> records = new ArrayList<>();
try (RecordReaderIterator iterator = new RecordReaderIterator(reader)) {
while (iterator.hasNext()) {
KeyValue kv = iterator.next();
records.add(
new TestRecord(kv.valueKind(), kv.key().getInt(0), kv.value().getInt(0)));
}
}
return records;
}

private static class TestRecord {

private final ValueKind kind;
private final int k;
private final int v;

private TestRecord(ValueKind kind, int k, int v) {
this.kind = kind;
this.k = k;
this.v = v;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestRecord that = (TestRecord) o;
return k == that.k && v == that.v && kind == that.kind;
}

@Override
public String toString() {
return "TestRecord{" + "kind=" + kind + ", k=" + k + ", v=" + v + '}';
}
}

}

测试用例产生200条一批数据,到 sequenceNumber = 186 时,写 memTable 失败执行第一次 flush :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
sequenceNumber -> 0, ValueKind -> ADD, key -> +I(79), value -> +I(-1282339797)
sequenceNumber -> 1, ValueKind -> ADD, key -> +I(81), value -> +I(1179918470)
sequenceNumber -> 2, ValueKind -> DELETE, key -> +I(27), value -> +I(-1398930820)
sequenceNumber -> 3, ValueKind -> DELETE, key -> +I(48), value -> +I(-1816730153)
sequenceNumber -> 4, ValueKind -> DELETE, key -> +I(46), value -> +I(-2008842454)
sequenceNumber -> 5, ValueKind -> DELETE, key -> +I(86), value -> +I(65480438)
sequenceNumber -> 6, ValueKind -> DELETE, key -> +I(56), value -> +I(791928228)
sequenceNumber -> 7, ValueKind -> ADD, key -> +I(80), value -> +I(2011579993)
sequenceNumber -> 8, ValueKind -> ADD, key -> +I(31), value -> +I(-2035056816)
sequenceNumber -> 9, ValueKind -> ADD, key -> +I(16), value -> +I(607711095)
sequenceNumber -> 10, ValueKind -> ADD, key -> +I(6), value -> +I(-1623789565)
sequenceNumber -> 11, ValueKind -> ADD, key -> +I(8), value -> +I(-432617120)
sequenceNumber -> 12, ValueKind -> ADD, key -> +I(4), value -> +I(418530737)
sequenceNumber -> 13, ValueKind -> DELETE, key -> +I(50), value -> +I(-945438846)
sequenceNumber -> 14, ValueKind -> DELETE, key -> +I(45), value -> +I(1561651582)
sequenceNumber -> 15, ValueKind -> DELETE, key -> +I(86), value -> +I(-1087187082)
sequenceNumber -> 16, ValueKind -> ADD, key -> +I(95), value -> +I(2007713649)
sequenceNumber -> 17, ValueKind -> DELETE, key -> +I(51), value -> +I(-963275986)
sequenceNumber -> 18, ValueKind -> ADD, key -> +I(6), value -> +I(1588442994)
sequenceNumber -> 19, ValueKind -> ADD, key -> +I(12), value -> +I(-281565984)
sequenceNumber -> 20, ValueKind -> ADD, key -> +I(99), value -> +I(303373668)
sequenceNumber -> 21, ValueKind -> DELETE, key -> +I(52), value -> +I(-667201927)
sequenceNumber -> 22, ValueKind -> ADD, key -> +I(54), value -> +I(-13136015)
sequenceNumber -> 23, ValueKind -> ADD, key -> +I(92), value -> +I(977320072)
sequenceNumber -> 24, ValueKind -> ADD, key -> +I(33), value -> +I(-111372055)
sequenceNumber -> 25, ValueKind -> DELETE, key -> +I(98), value -> +I(-1109725994)
sequenceNumber -> 26, ValueKind -> ADD, key -> +I(46), value -> +I(-1950499520)
sequenceNumber -> 27, ValueKind -> ADD, key -> +I(81), value -> +I(-1088651475)
sequenceNumber -> 28, ValueKind -> DELETE, key -> +I(54), value -> +I(557220361)
sequenceNumber -> 29, ValueKind -> ADD, key -> +I(6), value -> +I(-1736811783)
sequenceNumber -> 30, ValueKind -> DELETE, key -> +I(73), value -> +I(1575145486)
sequenceNumber -> 31, ValueKind -> DELETE, key -> +I(27), value -> +I(-1649396023)
sequenceNumber -> 32, ValueKind -> DELETE, key -> +I(40), value -> +I(-597968181)
sequenceNumber -> 33, ValueKind -> DELETE, key -> +I(90), value -> +I(1162929887)
sequenceNumber -> 34, ValueKind -> ADD, key -> +I(70), value -> +I(-991165706)
sequenceNumber -> 35, ValueKind -> DELETE, key -> +I(81), value -> +I(417316419)
sequenceNumber -> 36, ValueKind -> DELETE, key -> +I(37), value -> +I(1764148187)
sequenceNumber -> 37, ValueKind -> ADD, key -> +I(34), value -> +I(-1127910879)
sequenceNumber -> 38, ValueKind -> ADD, key -> +I(77), value -> +I(-520815433)
sequenceNumber -> 39, ValueKind -> ADD, key -> +I(18), value -> +I(2051544406)
sequenceNumber -> 40, ValueKind -> ADD, key -> +I(21), value -> +I(-107422117)
sequenceNumber -> 41, ValueKind -> ADD, key -> +I(80), value -> +I(1788072570)
sequenceNumber -> 42, ValueKind -> ADD, key -> +I(67), value -> +I(-1262468430)
sequenceNumber -> 43, ValueKind -> DELETE, key -> +I(9), value -> +I(-863921982)
sequenceNumber -> 44, ValueKind -> ADD, key -> +I(77), value -> +I(-268941560)
sequenceNumber -> 45, ValueKind -> DELETE, key -> +I(39), value -> +I(705039510)
sequenceNumber -> 46, ValueKind -> ADD, key -> +I(7), value -> +I(858389285)
sequenceNumber -> 47, ValueKind -> DELETE, key -> +I(22), value -> +I(475088561)
sequenceNumber -> 48, ValueKind -> ADD, key -> +I(33), value -> +I(390088338)
sequenceNumber -> 49, ValueKind -> DELETE, key -> +I(6), value -> +I(-1991355107)
sequenceNumber -> 50, ValueKind -> DELETE, key -> +I(21), value -> +I(-1391469913)
sequenceNumber -> 51, ValueKind -> ADD, key -> +I(27), value -> +I(-1840343188)
sequenceNumber -> 52, ValueKind -> DELETE, key -> +I(90), value -> +I(838800181)
sequenceNumber -> 53, ValueKind -> DELETE, key -> +I(88), value -> +I(-1455750144)
sequenceNumber -> 54, ValueKind -> DELETE, key -> +I(44), value -> +I(1590319332)
sequenceNumber -> 55, ValueKind -> ADD, key -> +I(12), value -> +I(558741055)
sequenceNumber -> 56, ValueKind -> DELETE, key -> +I(17), value -> +I(339525532)
sequenceNumber -> 57, ValueKind -> ADD, key -> +I(8), value -> +I(485877251)
sequenceNumber -> 58, ValueKind -> DELETE, key -> +I(65), value -> +I(-1470592871)
sequenceNumber -> 59, ValueKind -> DELETE, key -> +I(88), value -> +I(1251223210)
sequenceNumber -> 60, ValueKind -> DELETE, key -> +I(84), value -> +I(-1696484617)
sequenceNumber -> 61, ValueKind -> DELETE, key -> +I(10), value -> +I(-1501259636)
sequenceNumber -> 62, ValueKind -> ADD, key -> +I(41), value -> +I(-866046253)
sequenceNumber -> 63, ValueKind -> DELETE, key -> +I(69), value -> +I(868229578)
sequenceNumber -> 64, ValueKind -> ADD, key -> +I(15), value -> +I(-1375050149)
sequenceNumber -> 65, ValueKind -> ADD, key -> +I(43), value -> +I(1250941867)
sequenceNumber -> 66, ValueKind -> ADD, key -> +I(67), value -> +I(-1754729116)
sequenceNumber -> 67, ValueKind -> DELETE, key -> +I(90), value -> +I(-476497914)
sequenceNumber -> 68, ValueKind -> DELETE, key -> +I(75), value -> +I(1225352026)
sequenceNumber -> 69, ValueKind -> DELETE, key -> +I(57), value -> +I(-1630271894)
sequenceNumber -> 70, ValueKind -> ADD, key -> +I(54), value -> +I(-1789471578)
sequenceNumber -> 71, ValueKind -> DELETE, key -> +I(56), value -> +I(1727726334)
sequenceNumber -> 72, ValueKind -> DELETE, key -> +I(6), value -> +I(1942470627)
sequenceNumber -> 73, ValueKind -> ADD, key -> +I(80), value -> +I(1019545942)
sequenceNumber -> 74, ValueKind -> DELETE, key -> +I(11), value -> +I(651497588)
sequenceNumber -> 75, ValueKind -> ADD, key -> +I(66), value -> +I(502330288)
sequenceNumber -> 76, ValueKind -> ADD, key -> +I(28), value -> +I(-1694526628)
sequenceNumber -> 77, ValueKind -> ADD, key -> +I(5), value -> +I(597994318)
sequenceNumber -> 78, ValueKind -> ADD, key -> +I(61), value -> +I(-577870746)
sequenceNumber -> 79, ValueKind -> DELETE, key -> +I(18), value -> +I(-819164947)
sequenceNumber -> 80, ValueKind -> ADD, key -> +I(27), value -> +I(-1330437890)
sequenceNumber -> 81, ValueKind -> ADD, key -> +I(52), value -> +I(2093608076)
sequenceNumber -> 82, ValueKind -> DELETE, key -> +I(69), value -> +I(-1527497908)
sequenceNumber -> 83, ValueKind -> DELETE, key -> +I(38), value -> +I(-176326148)
sequenceNumber -> 84, ValueKind -> DELETE, key -> +I(66), value -> +I(-1719225487)
sequenceNumber -> 85, ValueKind -> DELETE, key -> +I(73), value -> +I(2019867633)
sequenceNumber -> 86, ValueKind -> ADD, key -> +I(56), value -> +I(-530313746)
sequenceNumber -> 87, ValueKind -> ADD, key -> +I(91), value -> +I(740139405)
sequenceNumber -> 88, ValueKind -> ADD, key -> +I(75), value -> +I(1199728040)
sequenceNumber -> 89, ValueKind -> DELETE, key -> +I(76), value -> +I(-622761322)
sequenceNumber -> 90, ValueKind -> ADD, key -> +I(60), value -> +I(1718043906)
sequenceNumber -> 91, ValueKind -> ADD, key -> +I(50), value -> +I(-1682393888)
sequenceNumber -> 92, ValueKind -> ADD, key -> +I(58), value -> +I(1551652094)
sequenceNumber -> 93, ValueKind -> DELETE, key -> +I(91), value -> +I(-1049392752)
sequenceNumber -> 94, ValueKind -> ADD, key -> +I(11), value -> +I(246037102)
sequenceNumber -> 95, ValueKind -> DELETE, key -> +I(60), value -> +I(-816180426)
sequenceNumber -> 96, ValueKind -> DELETE, key -> +I(83), value -> +I(-1623000942)
sequenceNumber -> 97, ValueKind -> ADD, key -> +I(0), value -> +I(964697602) // 排序集合中的第 1 条数据
sequenceNumber -> 98, ValueKind -> DELETE, key -> +I(64), value -> +I(-194759233)
sequenceNumber -> 99, ValueKind -> DELETE, key -> +I(56), value -> +I(-1374078600)
sequenceNumber -> 100, ValueKind -> ADD, key -> +I(44), value -> +I(198351328)
sequenceNumber -> 101, ValueKind -> ADD, key -> +I(44), value -> +I(-1998891159)
sequenceNumber -> 102, ValueKind -> ADD, key -> +I(64), value -> +I(75726825)
sequenceNumber -> 103, ValueKind -> DELETE, key -> +I(86), value -> +I(1354212903)
sequenceNumber -> 104, ValueKind -> ADD, key -> +I(44), value -> +I(-508548447)
sequenceNumber -> 105, ValueKind -> ADD, key -> +I(78), value -> +I(-1504299996)
sequenceNumber -> 106, ValueKind -> ADD, key -> +I(36), value -> +I(1967516225)
sequenceNumber -> 107, ValueKind -> ADD, key -> +I(43), value -> +I(-1448527484)
sequenceNumber -> 108, ValueKind -> DELETE, key -> +I(80), value -> +I(422507492)
sequenceNumber -> 109, ValueKind -> DELETE, key -> +I(70), value -> +I(19710019)
sequenceNumber -> 110, ValueKind -> DELETE, key -> +I(30), value -> +I(1060594873)
sequenceNumber -> 111, ValueKind -> ADD, key -> +I(92), value -> +I(645029376)
sequenceNumber -> 112, ValueKind -> ADD, key -> +I(44), value -> +I(-500311443)
sequenceNumber -> 113, ValueKind -> DELETE, key -> +I(96), value -> +I(-1664755054)
sequenceNumber -> 114, ValueKind -> ADD, key -> +I(86), value -> +I(488472411)
sequenceNumber -> 115, ValueKind -> DELETE, key -> +I(42), value -> +I(-1960475417)
sequenceNumber -> 116, ValueKind -> DELETE, key -> +I(71), value -> +I(-1040436146)
sequenceNumber -> 117, ValueKind -> DELETE, key -> +I(52), value -> +I(-350114843)
sequenceNumber -> 118, ValueKind -> ADD, key -> +I(31), value -> +I(1415574280)
sequenceNumber -> 119, ValueKind -> DELETE, key -> +I(64), value -> +I(-799816888)
sequenceNumber -> 120, ValueKind -> DELETE, key -> +I(64), value -> +I(332753861)
sequenceNumber -> 121, ValueKind -> DELETE, key -> +I(8), value -> +I(361512102)
sequenceNumber -> 122, ValueKind -> ADD, key -> +I(7), value -> +I(155422065)
sequenceNumber -> 123, ValueKind -> DELETE, key -> +I(80), value -> +I(289053479)
sequenceNumber -> 124, ValueKind -> ADD, key -> +I(92), value -> +I(-721556651)
sequenceNumber -> 125, ValueKind -> DELETE, key -> +I(35), value -> +I(1458427036)
sequenceNumber -> 126, ValueKind -> ADD, key -> +I(86), value -> +I(-49406890)
sequenceNumber -> 127, ValueKind -> DELETE, key -> +I(93), value -> +I(-140821273)
sequenceNumber -> 128, ValueKind -> DELETE, key -> +I(52), value -> +I(-62714072)
sequenceNumber -> 129, ValueKind -> DELETE, key -> +I(25), value -> +I(664998142)
sequenceNumber -> 130, ValueKind -> DELETE, key -> +I(36), value -> +I(-1222167553)
sequenceNumber -> 131, ValueKind -> DELETE, key -> +I(59), value -> +I(884329714)
sequenceNumber -> 132, ValueKind -> DELETE, key -> +I(52), value -> +I(431167396)
sequenceNumber -> 133, ValueKind -> DELETE, key -> +I(82), value -> +I(-1503470408)
sequenceNumber -> 134, ValueKind -> ADD, key -> +I(71), value -> +I(-1866273544)
sequenceNumber -> 135, ValueKind -> ADD, key -> +I(36), value -> +I(-1348634198)
sequenceNumber -> 136, ValueKind -> DELETE, key -> +I(95), value -> +I(-1877911221)
sequenceNumber -> 137, ValueKind -> DELETE, key -> +I(68), value -> +I(1990244092)
sequenceNumber -> 138, ValueKind -> DELETE, key -> +I(52), value -> +I(1272509713)
sequenceNumber -> 139, ValueKind -> ADD, key -> +I(63), value -> +I(549481877)
sequenceNumber -> 140, ValueKind -> DELETE, key -> +I(30), value -> +I(-1827265063)
sequenceNumber -> 141, ValueKind -> ADD, key -> +I(17), value -> +I(961410061)
sequenceNumber -> 142, ValueKind -> ADD, key -> +I(42), value -> +I(-1923379307)
sequenceNumber -> 143, ValueKind -> DELETE, key -> +I(66), value -> +I(-447682108)
sequenceNumber -> 144, ValueKind -> DELETE, key -> +I(88), value -> +I(-583980392)
sequenceNumber -> 145, ValueKind -> DELETE, key -> +I(1), value -> +I(2065491016)
sequenceNumber -> 146, ValueKind -> ADD, key -> +I(0), value -> +I(521101514) // 排序集合中的第 2 条数据
sequenceNumber -> 147, ValueKind -> ADD, key -> +I(30), value -> +I(1132139652)
sequenceNumber -> 148, ValueKind -> ADD, key -> +I(72), value -> +I(713572359)
sequenceNumber -> 149, ValueKind -> DELETE, key -> +I(5), value -> +I(162860404)
sequenceNumber -> 150, ValueKind -> DELETE, key -> +I(36), value -> +I(-2108809046)
sequenceNumber -> 151, ValueKind -> DELETE, key -> +I(3), value -> +I(104679497)
sequenceNumber -> 152, ValueKind -> ADD, key -> +I(15), value -> +I(1452009131)
sequenceNumber -> 153, ValueKind -> ADD, key -> +I(81), value -> +I(-639638965)
sequenceNumber -> 154, ValueKind -> DELETE, key -> +I(62), value -> +I(1906544864)
sequenceNumber -> 155, ValueKind -> ADD, key -> +I(33), value -> +I(-175182120)
sequenceNumber -> 156, ValueKind -> DELETE, key -> +I(89), value -> +I(452324661)
sequenceNumber -> 157, ValueKind -> ADD, key -> +I(80), value -> +I(836346953)
sequenceNumber -> 158, ValueKind -> ADD, key -> +I(99), value -> +I(651514373)
sequenceNumber -> 159, ValueKind -> DELETE, key -> +I(71), value -> +I(-108776269)
sequenceNumber -> 160, ValueKind -> DELETE, key -> +I(50), value -> +I(-73532459)
sequenceNumber -> 161, ValueKind -> ADD, key -> +I(14), value -> +I(2128763828)
sequenceNumber -> 162, ValueKind -> DELETE, key -> +I(67), value -> +I(-456952804)
sequenceNumber -> 163, ValueKind -> ADD, key -> +I(43), value -> +I(-1249173302)
sequenceNumber -> 164, ValueKind -> DELETE, key -> +I(42), value -> +I(-1216151330)
sequenceNumber -> 165, ValueKind -> DELETE, key -> +I(84), value -> +I(-500755987)
sequenceNumber -> 166, ValueKind -> DELETE, key -> +I(10), value -> +I(-1947673013)
sequenceNumber -> 167, ValueKind -> DELETE, key -> +I(95), value -> +I(-1342645152)
sequenceNumber -> 168, ValueKind -> ADD, key -> +I(85), value -> +I(1979169435)
sequenceNumber -> 169, ValueKind -> ADD, key -> +I(80), value -> +I(157530613)
sequenceNumber -> 170, ValueKind -> DELETE, key -> +I(50), value -> +I(743959004)
sequenceNumber -> 171, ValueKind -> ADD, key -> +I(35), value -> +I(170145595)
sequenceNumber -> 172, ValueKind -> ADD, key -> +I(2), value -> +I(-464790927)
sequenceNumber -> 173, ValueKind -> ADD, key -> +I(15), value -> +I(-1317899453)
sequenceNumber -> 174, ValueKind -> ADD, key -> +I(74), value -> +I(1008782452)
sequenceNumber -> 175, ValueKind -> ADD, key -> +I(51), value -> +I(-763579376)
sequenceNumber -> 176, ValueKind -> DELETE, key -> +I(0), value -> +I(500333878) // 排序集合中的第 3 条数据
sequenceNumber -> 177, ValueKind -> ADD, key -> +I(48), value -> +I(1184699077)
sequenceNumber -> 178, ValueKind -> DELETE, key -> +I(67), value -> +I(-1805865582)
sequenceNumber -> 179, ValueKind -> DELETE, key -> +I(60), value -> +I(-861165267)
sequenceNumber -> 180, ValueKind -> ADD, key -> +I(10), value -> +I(-888539951)
sequenceNumber -> 181, ValueKind -> ADD, key -> +I(62), value -> +I(1848778293)
sequenceNumber -> 182, ValueKind -> ADD, key -> +I(48), value -> +I(-1260289205)
sequenceNumber -> 183, ValueKind -> DELETE, key -> +I(28), value -> +I(1880572265)
sequenceNumber -> 184, ValueKind -> DELETE, key -> +I(52), value -> +I(-1507320045)
sequenceNumber -> 185, ValueKind -> DELETE, key -> +I(19), value -> +I(14067102)
sequenceNumber -> 186, ValueKind -> DELETE, key -> +I(77), value -> +I(1177031237)

经过 QuickSort.sort(buffer) 排序如下,按照 sequenceNumber, key 排序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
sequenceNumber -> 97, ValueKind -> ADD, key -> +I(0), value -> +I(964697602)        
sequenceNumber -> 146, ValueKind -> ADD, key -> +I(0), value -> +I(521101514)
sequenceNumber -> 176, ValueKind -> DELETE, key -> +I(0), value -> +I(500333878) // memTable.iterator().next() 返回的第 1 条数据
sequenceNumber -> 145, ValueKind -> DELETE, key -> +I(1), value -> +I(2065491016) // memTable.iterator().next() 返回的第 2 条数据
sequenceNumber -> 172, ValueKind -> ADD, key -> +I(2), value -> +I(-464790927) // memTable.iterator().next() 返回的第 3 条数据
sequenceNumber -> 151, ValueKind -> DELETE, key -> +I(3), value -> +I(104679497) // memTable.iterator().next() 返回的第 4 条数据
sequenceNumber -> 12, ValueKind -> ADD, key -> +I(4), value -> +I(418530737) // memTable.iterator().next() 返回的第 5 条数据
sequenceNumber -> 77, ValueKind -> ADD, key -> +I(5), value -> +I(597994318)
sequenceNumber -> 149, ValueKind -> DELETE, key -> +I(5), value -> +I(162860404) // memTable.iterator().next() 返回的第 6 条数据
sequenceNumber -> 10, ValueKind -> ADD, key -> +I(6), value -> +I(-1623789565)
sequenceNumber -> 18, ValueKind -> ADD, key -> +I(6), value -> +I(1588442994)
sequenceNumber -> 29, ValueKind -> ADD, key -> +I(6), value -> +I(-1736811783)
sequenceNumber -> 49, ValueKind -> DELETE, key -> +I(6), value -> +I(-1991355107)
sequenceNumber -> 72, ValueKind -> DELETE, key -> +I(6), value -> +I(1942470627) // memTable.iterator().next() 返回的第 7 条数据
sequenceNumber -> 46, ValueKind -> ADD, key -> +I(7), value -> +I(858389285)
sequenceNumber -> 122, ValueKind -> ADD, key -> +I(7), value -> +I(155422065) // memTable.iterator().next() 返回的第 8 条数据

... 后面就不一一列举了

MergeTreeWriter.flush() 操作是从 memTable.iterator() 迭代读取数据,通过 SstFileWriter.write() 写入 bucket-0/sst-* 文件 。

关键实现

MergeTreeWriter

MergeTreeWriter.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/** A {@link RecordWriter} to write records and generate {@link Increment}. */
public class MergeTreeWriter implements RecordWriter {

/**
* 内存 table
*/
private final MemTable memTable;

/**
* 异步 compact 线程
*/
private final CompactManager compactManager;

private final Levels levels;

/**
* key 值比较器
*/
private final Comparator<RowData> keyComparator;

/**
* 用于 key 值去重的工具类
*/
private final Accumulator accumulator;

private final SstFileWriter sstFileWriter;

private final boolean commitForceCompact;

private final LinkedHashSet<SstFileMeta> newFiles;

private final LinkedHashMap<String, SstFileMeta> compactBefore;

private final LinkedHashSet<SstFileMeta> compactAfter;

private long newSequenceNumber;

private long newSequenceNumber() {
return newSequenceNumber++;
}

@VisibleForTesting
Levels levels() {
return levels;
}


/**
* MergeWriter 接收并写入数据
*/
@Override
public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {

// 首先生成一个序列号
long sequenceNumber = newSequenceNumber();

// 将数据写入 memTable 中
boolean success = memTable.put(sequenceNumber, valueKind, key, value);

if (!success) {

// 当 memTable.nextMemorySegment 获取失败,即写 memTable 失败,需要执行刷盘
flush();

// 重试一次,接着上一次失败的 sequenceNumber ,继续写入 memTable 中
success = memTable.put(sequenceNumber, valueKind, key, value);
if (!success) {
throw new RuntimeException("Mem table is too small to hold a single element.");
}
}
}


/**
* 从 memTable 中读取数据,刷写到 SstFile
*/
private void flush() throws Exception {

if (memTable.size() > 0) {

finishCompaction();

// 从 memTable 中迭代读取数据 ,在此过程中,对于相同的 key ,最大 sequenceNumber 的 key 将会覆盖小 sequenceNumber
Iterator<KeyValue> iterator = memTable.iterator(keyComparator, accumulator);

// 将数据写入 sst file ,level 设置为0
List<SstFileMeta> files =
sstFileWriter.write(CloseableIterator.adapterForIterator(iterator), 0);
newFiles.addAll(files);

// 将生成的 sst file 加入到 Level0 中
files.forEach(levels::addLevel0File);

// 清空 memTable
memTable.clear();

// 异步执行 LSM compactions ,丢入线程池
submitCompaction();
}
}

private void finishCompaction() throws ExecutionException, InterruptedException {

// 需要结束掉上一次 compaction
Optional<CompactManager.CompactResult> result = compactManager.finishCompaction(levels);

// compact 前后的 LinkedHashSet<SstFileMeta> 存储在 MergeTreeWriter 中
result.ifPresent(this::updateCompactResult);
}

private void submitCompaction() {
compactManager.submitCompaction(levels);
}


/**
* 提交准备,返回 Increment
*/
@Override
public Increment prepareCommit() throws Exception {

// 刷写 memTable
flush();

if (commitForceCompact) {
finishCompaction();
}

// 提取构建出增量 files ,并清空 newFiles 、compactBefore 、compactAfter
return drainIncrement();
}

private Increment drainIncrement() {
Increment increment =
new Increment(
new ArrayList<>(newFiles),
new ArrayList<>(compactBefore.values()),
new ArrayList<>(compactAfter));
newFiles.clear();
compactBefore.clear();
compactAfter.clear();
return increment;
}


/**
* 同步操作,结束掉 compaction
*/
@Override
public void sync() throws Exception {
finishCompaction();
}

}

SortBufferMemTable

SortBufferMemTable.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/** A {@link MemTable} which stores records in {@link BinaryInMemorySortBuffer}. */
public class SortBufferMemTable implements MemTable {

private final RowType keyType;
private final RowType valueType;
private final KeyValueSerializer serializer;

/**
* 内存 buffer
*/
private final BinaryInMemorySortBuffer buffer;

@Override
public boolean put(long sequenceNumber, ValueKind valueKind, RowData key, RowData value) throws IOException {
return buffer.write(serializer.toRow(key, sequenceNumber, valueKind, value));
}

@Override
public Iterator<KeyValue> iterator(Comparator<RowData> keyComparator, Accumulator accumulator) {
// 排序之后,memTable 中的数据条数不变
new QuickSort().sort(buffer);
MutableObjectIterator<BinaryRowData> kvIter = buffer.getIterator();
return new MemTableIterator(kvIter, keyComparator, accumulator);
}


/**
* memTable iterator 迭代器
*/
private class MemTableIterator implements Iterator<KeyValue> {

private final MutableObjectIterator<BinaryRowData> kvIter;
private final Comparator<RowData> keyComparator;
private final Accumulator accumulator;

// holds the accumulated value
private KeyValueSerializer previous;
private BinaryRowData previousRow;
// reads the next kv
private KeyValueSerializer current;
private BinaryRowData currentRow;

/**
* iterator 指针是否已经推进过的标识
*/
private boolean advanced;


private MemTableIterator(
MutableObjectIterator<BinaryRowData> kvIter,
Comparator<RowData> keyComparator,
Accumulator accumulator) {
this.kvIter = kvIter;
this.keyComparator = keyComparator;
this.accumulator = accumulator;

int totalFieldCount = keyType.getFieldCount() + 2 + valueType.getFieldCount();
this.previous = new KeyValueSerializer(keyType, valueType);
this.previousRow = new BinaryRowData(totalFieldCount);
this.current = new KeyValueSerializer(keyType, valueType);
this.currentRow = new BinaryRowData(totalFieldCount);
readOnce();
this.advanced = false;
}

@Override
public boolean hasNext() {
advanceIfNeeded();
return previousRow != null;
}

@Override
public KeyValue next() {
advanceIfNeeded();
if (previousRow == null) {
return null;
}
advanced = false;
return previous.getReusedKv();
}

private void advanceIfNeeded() {
if (advanced) {
return;
}
advanced = true;

RowData result;
do {

// 交换 current 与 previous 值
swapSerializers();
if (previousRow == null) {
return;
}
accumulator.reset();
accumulator.add(previous.getReusedKv().value());

// readOnce() 从经过排序的 buffer 中读取数据
while (readOnce()) {

// 比较当前行数据和 previous 行数据的key ,如果不相等,则跳出本次循环
if (keyComparator.compare(
previous.getReusedKv().key(), current.getReusedKv().key())
!= 0) {
break;
}

// 如果相等,则用最新 sequenceNumber 的数据,覆盖更新到 accumulator
accumulator.add(current.getReusedKv().value());

// 指针继续向前推进
swapSerializers();
}

// 返回相同 key 的具有最大 sequenceNumber 数据
result = accumulator.getValue();
} while (result == null);

// 赋值给 previous
previous.getReusedKv().setValue(result);
}

private boolean readOnce() {
try {

// 读取下一行数据
currentRow = kvIter.next(currentRow);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (currentRow != null) {
current.fromRow(currentRow);
}
return currentRow != null;
}

/**
* 交换赋值 current 与 previous
*/
private void swapSerializers() {
KeyValueSerializer tmp = previous;
BinaryRowData tmpRow = previousRow;
previous = current;
previousRow = currentRow;
current = tmp;
currentRow = tmpRow;
}
}

@Override
public int size() {
return buffer.size();
}

@Override
public void clear() {
buffer.reset();
}

}

CompactManager

CompactManager.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
/** Manager to submit compaction task. */
public class CompactManager {
private final ExecutorService executor;

private final CompactStrategy strategy;

private final Comparator<RowData> keyComparator;

private final long minFileSize;

private final Rewriter rewriter;

private Future<CompactResult> taskFuture;

/** Submit a new compaction task. */
public void submitCompaction(Levels levels) {

// 执行 finishCompaction 时,会将 taskFuture 置为空,以此来判断上一次 compaction 是否结束
if (taskFuture != null) {
throw new IllegalStateException(
"Please finish the previous compaction before submitting new one.");
}

strategy.pick(levels.numberOfLevels(), levels.levelSortedRuns())
.ifPresent(

// 为 CompactUnit 分配一个线程
unit -> {
if (unit.files().size() < 2) {
return;
}
/*
* As long as there is no older data, We can drop the deletion.
* If the output level is 0, there may be older data not involved in compaction.
* If the output level is bigger than 0, as long as there is no older data in
* the current levels, the output is the oldest, so we can drop the deletion.
* See CompactStrategy.pick.
*/
boolean dropDelete =
unit.outputLevel() != 0
&& unit.outputLevel() >= levels.nonEmptyHighestLevel();

CompactTask task = new CompactTask(unit, dropDelete);
taskFuture = executor.submit(task);
});
}

/** Finish current task, and update result files to {@link Levels}. */
public Optional<CompactResult> finishCompaction(Levels levels)
throws ExecutionException, InterruptedException {

if (taskFuture != null) {

// 从 CompactTask 线程返回 Future 回调结果
CompactResult result = taskFuture.get();

// 获取 compact 前后的 List<SstFileMeta>
levels.update(result.before(), result.after());

taskFuture = null;
return Optional.of(result);
}
return Optional.empty();
}

// --------------------------------------------------------------------------------------------
// Internal classes
// --------------------------------------------------------------------------------------------

/** Compaction task. */
private class CompactTask implements Callable<CompactResult> {

private final int outputLevel;

private final List<List<SortedRun>> partitioned;

private final boolean dropDelete;

private CompactTask(CompactUnit unit, boolean dropDelete) {
this.outputLevel = unit.outputLevel();
this.partitioned = new IntervalPartition(unit.files(), keyComparator).partition();
this.dropDelete = dropDelete;
}

@Override
public CompactResult call() throws Exception {
return compact();
}

private CompactResult compact() throws Exception {
List<List<SortedRun>> candidate = new ArrayList<>();
List<SstFileMeta> before = new ArrayList<>();
List<SstFileMeta> after = new ArrayList<>();

// Checking the order and compacting adjacent and contiguous files
// Note: can't skip an intermediate file to compact, this will destroy the overall
// orderliness
for (List<SortedRun> section : partitioned) {
if (section.size() > 1) {
candidate.add(section);
} else {
SortedRun run = section.get(0);
// No overlapping:
// We can just upgrade the large file and just change the level instead of
// rewriting it
// But for small files, we will try to compact it
for (SstFileMeta file : run.files()) {
if (file.fileSize() < minFileSize) {
// Smaller files are rewritten along with the previous files
candidate.add(singletonList(SortedRun.fromSingle(file)));
} else {
// Large file appear, rewrite previous and upgrade it
rewrite(candidate, before, after);
upgrade(file, before, after);
}
}
}
}
rewrite(candidate, before, after);
return result(before, after);
}

private void upgrade(SstFileMeta file, List<SstFileMeta> before, List<SstFileMeta> after) {
if (file.level() != outputLevel) {
before.add(file);
after.add(file.upgrade(outputLevel));
}
}

private void rewrite(
List<List<SortedRun>> candidate, List<SstFileMeta> before, List<SstFileMeta> after)
throws Exception {
if (candidate.isEmpty()) {
return;
}
if (candidate.size() == 1) {
List<SortedRun> section = candidate.get(0);
if (section.size() == 0) {
return;
} else if (section.size() == 1) {
for (SstFileMeta file : section.get(0).files()) {
upgrade(file, before, after);
}
candidate.clear();
return;
}
}
candidate.forEach(runs -> runs.forEach(run -> before.addAll(run.files())));
after.addAll(rewriter.rewrite(outputLevel, dropDelete, candidate));
candidate.clear();
}

private CompactResult result(List<SstFileMeta> before, List<SstFileMeta> after) {
return new CompactResult() {
@Override
public List<SstFileMeta> before() {
return before;
}

@Override
public List<SstFileMeta> after() {
return after;
}
};
}
}
}

参考

lsm-trie git
LSM-tire_An_LSM-tree-based_Ultra-Large_Key-Value_Store_for_Small_Data.pdf
Skip_Lists_A_Probabilistic_Alternative_to_Balanced_Trees.pdf
The_Log-Structured_Merge-Tree.pdf
WiscKey_Separating_Keys_from_Values_in_SSD-conscious_Storage.pdf
浅析 LSM-tree
Designing Data-Intensive Applications 分享视频
《精通LevelDB》 2022年1月出版