Flink DML SQL with Calcite

Flink-DDL-SQL-With-Calcite 一文中介绍了 flink-sql-parser 的工程实现,以及 SQL 解析、SQL 校验的相关细节,
本文主要介绍在 DML SQL 中,Calcite 是如何做 SQL 查询优化的。

Query

org.apache.flink.table.planner.plan.stream.sql.TableScanTest#testEventTimeTemporalJoinOnUpsertSource 测试用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CREATE TABLE orders (
amount BIGINT,
currency STRING,
rowtime TIMESTAMP(3),
WATERMARK FOR rowtime AS rowtime
) WITH (
'connector' = 'values',
'changelog-mode' = 'I'
);

CREATE TABLE rates_history (
currency STRING PRIMARY KEY NOT ENFORCED,
rate BIGINT,
rowtime TIMESTAMP(3),
WATERMARK FOR rowtime AS rowtime
) WITH (
'connector' = 'values',
'changelog-mode' = 'UA,D',
'disable-lookup' = 'true'
);

SELECT o.currency, o.amount, r.rate, o.amount * r.rate
FROM orders AS o
LEFT JOIN rates_history FOR SYSTEM_TIME AS OF o.rowtime AS r
ON o.currency = r.currency;

SqlNode -> RelNode 语句执行流程

查询 SQL 语句的解析、转换、执行流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

org.apache.flink.table.api.internal.TableEnvironmentImpl#executeSql

// 解析 SQL
org.apache.flink.table.planner.delegation.ParserImpl#parse
org.apache.flink.table.planner.parse.CalciteParser#parseSqlList
...
org.apache.flink.table.planner.operations.SqlToOperationConverter#toQueryOperation
org.apache.flink.table.planner.calcite.FlinkPlannerImpl#rel
org.apache.calcite.sql2rel.SqlToRelConverter#convertQuery 转交 Calcite 处理
org.apache.calcite.sql2rel.SqlToRelConverter#convertQueryRecursive 递归解析
org.apache.calcite.sql2rel.SqlToRelConverter#convertFrom
org.apache.calcite.sql2rel.SqlToRelConverter#convertIdentifier 源表表名
org.apache.calcite.sql.validate.SqlValidatorUtil#getRelOptTable
*org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader#getTable
org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader#toPreparingTable 返回的CatalogSourceTable为RelOptTable子类
org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader#convertCatalogTable 生成CatalogSourceTable
org.apache.calcite.sql2rel.SqlToRelConverter#toRel
org.apache.flink.table.planner.plan.schema.CatalogSourceTable#toRel
org.apache.flink.table.planner.connectors.DynamicSourceUtils#convertSourceToRel

初始生成的 RelNode ,如图所示:

优化

Flink Program 构建及相关优化规则,如图:

优化过程打印:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128

origin:
LogicalProject(currency=[$1], amount=[$0], rate=[$4], EXPR$3=[*($0, $4)])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{1, 2}])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalFilter(condition=[=($cor0.currency, $0)])
+- LogicalSnapshot(period=[$cor0.rowtime])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, rates_history]])

optimize subquery_rewrite cost 36 ms.
optimize result:
LogicalProject(exprs=[[$1, $0, $4, *($0, $4)]])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{1, 2}])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalFilter(condition=[=($cor0.currency, $0)])
+- LogicalSnapshot(period=[$cor0.rowtime])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, rates_history]])

optimize temporal_join_rewrite cost 31 ms.
optimize result:
LogicalProject(exprs=[[$1, $0, $4, *($0, $4)]])
+- LogicalJoin(condition=[AND(=($1, $3), __INITIAL_TEMPORAL_JOIN_CONDITION($2, $5, __TEMPORAL_JOIN_LEFT_KEY($1), __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[left])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalSnapshot(period=[$cor0.rowtime])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, rates_history]])

optimize decorrelate cost 12 ms.
optimize result:
LogicalProject(exprs=[[$1, $0, $4, *($0, $4)]])
+- LogicalJoin(condition=[AND(=($1, $3), __INITIAL_TEMPORAL_JOIN_CONDITION($2, $5, __TEMPORAL_JOIN_LEFT_KEY($1), __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[left])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalSnapshot(period=[$cor0.rowtime])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, rates_history]])

optimize default_rewrite cost 98 ms.
optimize result:
LogicalProject(exprs=[[$1, $0, $4, *($0, $4)]])
+- LogicalJoin(condition=[AND(=($1, $3), __INITIAL_TEMPORAL_JOIN_CONDITION($2, $5, __TEMPORAL_JOIN_LEFT_KEY($1), __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[left])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalSnapshot(period=[$cor0.rowtime])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, rates_history]])

optimize predicate_pushdown cost 41 ms.
optimize result:
LogicalProject(exprs=[[$1, $0, $4, *($0, $4)]])
+- LogicalJoin(condition=[AND(=($1, $3), __INITIAL_TEMPORAL_JOIN_CONDITION($2, $5, __TEMPORAL_JOIN_LEFT_KEY($1), __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[left])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalSnapshot(period=[$cor0.rowtime])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, rates_history]])

optimize project_rewrite cost 4 ms.
optimize result:
LogicalProject(exprs=[[$1, $0, $4, *($0, $4)]])
+- LogicalJoin(condition=[AND(=($1, $3), __INITIAL_TEMPORAL_JOIN_CONDITION($2, $5, __TEMPORAL_JOIN_LEFT_KEY($1), __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[left])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalSnapshot(period=[$cor0.rowtime])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, rates_history]])

optimize logical cost 840 ms.
optimize result:
FlinkLogicalCalc(select=[currency, amount, rate, *(amount, rate) AS EXPR$3])
+- FlinkLogicalJoin(condition=[AND(=($1, $3), __INITIAL_TEMPORAL_JOIN_CONDITION($2, $5, __TEMPORAL_JOIN_LEFT_KEY($1), __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[left])
:- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, orders]], fields=[amount, currency, rowtime])
+- FlinkLogicalSnapshot(period=[$cor0.rowtime])
+- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate, rowtime])

optimize logical_rewrite cost 126 ms.
optimize result:
FlinkLogicalCalc(select=[currency, amount, rate, *(amount, rate) AS EXPR$3])
+- FlinkLogicalJoin(condition=[AND(=($1, $3), __TEMPORAL_JOIN_CONDITION($2, $5, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY($3), __TEMPORAL_JOIN_LEFT_KEY($1), __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[left])
:- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, orders]], fields=[amount, currency, rowtime])
+- FlinkLogicalSnapshot(period=[$cor0.rowtime])
+- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate, rowtime])

optimize time_indicator cost 17 ms.
optimize result:
FlinkLogicalCalc(select=[currency, amount, rate, *(amount, rate) AS EXPR$3])
+- FlinkLogicalCalc(select=[amount, currency, rowtime, currency0, rate, CAST(rowtime0 AS TIMESTAMP(3)) AS rowtime0])
+- FlinkLogicalJoin(condition=[AND(=($1, $3), __TEMPORAL_JOIN_CONDITION($2, $5, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY($3), __TEMPORAL_JOIN_LEFT_KEY($1), __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[left])
:- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, orders]], fields=[amount, currency, rowtime])
+- FlinkLogicalSnapshot(period=[$cor0.rowtime])
+- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate, rowtime])

optimize physical cost 516 ms.
optimize result:
Calc(select=[currency, amount, rate, *(amount, rate) AS EXPR$3])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(currency, currency0), __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0), __TEMPORAL_JOIN_LEFT_KEY(currency), __TEMPORAL_JOIN_RIGHT_KEY(currency0)))], select=[amount, currency, rowtime, currency0, rate, rowtime0])
:- Exchange(distribution=[hash[currency]])
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
: +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[amount, currency, rowtime])
+- Exchange(distribution=[hash[currency]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+- ChangelogNormalize(key=[currency])
+- Exchange(distribution=[hash[currency]])
+- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate, rowtime])

optimize physical_rewrite cost 48 ms.
optimize result:
Calc(select=[currency, amount, rate, *(amount, rate) AS EXPR$3])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(currency, currency0), __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0), __TEMPORAL_JOIN_LEFT_KEY(currency), __TEMPORAL_JOIN_RIGHT_KEY(currency0)))], select=[amount, currency, rowtime, currency0, rate, rowtime0])
:- Exchange(distribution=[hash[currency]])
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
: +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[amount, currency, rowtime])
+- Exchange(distribution=[hash[currency]])
+- ChangelogNormalize(key=[currency])
+- Exchange(distribution=[hash[currency]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate, rowtime])

subquery_rewrite 前后对比:

logical 前后对比:

logical_rewrite 前后对比:

time_indicator 前后对比:

physical 前后对比:

physical_rewrite 前后对比:

最初的 RelNode 经过逻辑优化、物理优化后,最终生成 Flink 物理执行计划 FlinkPhysicalRel 。
其中,FlinkLogicalCalc 包含了 Project 和 Filter 操作,StreamExecExchange 继承了 Calcite 的 Exchange ,标识一次 hash distribute 。

生成 Transformation

1
2
3
4
5
6
7

// 转换生成 transformations ,并提交执行
org.apache.flink.table.api.internal.TableEnvironmentImpl#executeInternal(org.apache.flink.table.operations.Operation)
org.apache.flink.table.api.internal.TableEnvironmentImpl#executeQueryOperation
org.apache.flink.table.api.internal.TableEnvironmentImpl#translate
org.apache.flink.table.delegation.Executor#createPipeline
org.apache.flink.table.delegation.Executor#executeAsync

RichInsert

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

CREATE TABLE table_source(
name varchar,
channel varchar,
pv int,
rowtime TIMESTAMP(3),
WATERMARK FOR rowtime AS rowtime
) WITH (
'connector' = 'values',
'changelog-mode' = 'I'
);


CREATE TABLE table_side(
name varchar PRIMARY KEY NOT ENFORCED,
info varchar,
rowtime TIMESTAMP(3),
WATERMARK FOR rowtime AS rowtime
) WITH (
'connector' = 'values',
'changelog-mode' = 'UA,D',
'disable-lookup' = 'true'
);


CREATE TABLE table_sink(
name varchar,
channel varchar,
pv int,
rowtime TIMESTAMP(3),
info varchar
) WITH (
'connector' ='values',
'changelog-mode'='I'
);


INSERT INTO table_sink
SELECT
a.name,
a.channel,
a.pv,
a.rowtime,
b.info
FROM table_source a
JOIN table_side FOR SYSTEM_TIME AS OF a.rowtime AS b ON a.name = b.name
WHERE a.channel='channel1' AND a.pv>0;

SqlNode -> RelNode 语句执行流程

Insert SQL 语句的解析、转换、执行流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

org.apache.flink.table.api.internal.TableEnvironmentImpl#executeSql
org.apache.flink.table.planner.delegation.ParserImpl#parse: RichSqlInsert
*org.apache.flink.table.planner.operations.SqlToOperationConverter#convertSqlInsert: 最终返回CatalogSinkModifyOperation
org.apache.flink.table.planner.operations.SqlToOperationConverter#toQueryOperation: 处理Select语句,返回PlannerQueryOperation(relational.project())
org.apache.flink.table.api.internal.TableEnvironmentImpl#executeOperation
org.apache.flink.table.api.internal.TableEnvironmentImpl#executeInternal(java.util.List<org.apache.flink.table.operations.ModifyOperation>)
org.apache.flink.table.api.internal.TableEnvironmentImpl#translate
org.apache.flink.table.planner.delegation.PlannerBase#translate
org.apache.flink.table.planner.delegation.PlannerBase#translateToRel 对select字段进行校验,将ModifyOperation转换为Calcite的relational expression.也就是LogicalLegacySink。
*org.apache.flink.table.planner.delegation.PlannerBase#optimize 对生成的relational expression进行优化,默认使用StreamCommonSubGraphBasedOptimizer。
org.apache.flink.table.planner.delegation.PlannerBase#translateToExecNodePlan 优化后的RelNode被转换FlinkPhysicalRel
org.apache.flink.table.planner.delegation.StreamPlanner#translateToPlan
org.apache.flink.table.planner.plan.nodes.exec.ExecNode#translateToPlan 将每一个ExecNode转换为对应transformation
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan#createSourceTransformation
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan#createInputFormatTransformation
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink#createSinkTransformation

初始生成的 RelNode ,如图所示:

优化

优化过程打印:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154

astPlan ->
LogicalSink(table=[default_catalog.default_database.table_sink], fields=[name, channel, pv, rowtime, info])
+- LogicalProject(name=[$0], channel=[$1], pv=[$2], rowtime=[$3], info=[$5])
+- LogicalFilter(condition=[AND(=($1, _UTF-16LE'channel1'), >($2, 0))])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
: +- LogicalTableScan(table=[[default_catalog, default_database, table_source]])
+- LogicalFilter(condition=[=($cor0.name, $0)])
+- LogicalSnapshot(period=[$cor0.rowtime])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, table_side]])


optimize subquery_rewrite cost 75 ms.
optimize result:
LogicalSink(table=[default_catalog.default_database.table_sink], fields=[name, channel, pv, rowtime, info])
+- LogicalProject(inputs=[0..3], exprs=[[$5]])
+- LogicalFilter(condition=[AND(=($1, _UTF-16LE'channel1'), >($2, 0))])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
: +- LogicalTableScan(table=[[default_catalog, default_database, table_source]])
+- LogicalFilter(condition=[=($cor0.name, $0)])
+- LogicalSnapshot(period=[$cor0.rowtime])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, table_side]])

optimize temporal_join_rewrite cost 36 ms.
optimize result:
LogicalSink(table=[default_catalog.default_database.table_sink], fields=[name, channel, pv, rowtime, info])
+- LogicalProject(inputs=[0..3], exprs=[[$5]])
+- LogicalFilter(condition=[AND(=($1, _UTF-16LE'channel1'), >($2, 0))])
+- LogicalJoin(condition=[AND(=($0, $4), __INITIAL_TEMPORAL_JOIN_CONDITION($3, $6, __TEMPORAL_JOIN_LEFT_KEY($0), __TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
: +- LogicalTableScan(table=[[default_catalog, default_database, table_source]])
+- LogicalSnapshot(period=[$cor0.rowtime])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, table_side]])

optimize decorrelate cost 46 ms.
optimize result:
LogicalSink(table=[default_catalog.default_database.table_sink], fields=[name, channel, pv, rowtime, info])
+- LogicalProject(inputs=[0..3], exprs=[[$5]])
+- LogicalFilter(condition=[AND(=($1, _UTF-16LE'channel1'), >($2, 0))])
+- LogicalJoin(condition=[AND(=($0, $4), __INITIAL_TEMPORAL_JOIN_CONDITION($3, $6, __TEMPORAL_JOIN_LEFT_KEY($0), __TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
: +- LogicalTableScan(table=[[default_catalog, default_database, table_source]])
+- LogicalSnapshot(period=[$cor0.rowtime])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, table_side]])

optimize default_rewrite cost 327 ms.
optimize result:
LogicalSink(table=[default_catalog.default_database.table_sink], fields=[name, channel, pv, rowtime, info])
+- LogicalProject(inputs=[0], exprs=[[CAST(_UTF-16LE'channel1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE", $2, $3, $5]])
+- LogicalFilter(condition=[AND(=($1, _UTF-16LE'channel1'), >($2, 0))])
+- LogicalJoin(condition=[AND(=($0, $4), __INITIAL_TEMPORAL_JOIN_CONDITION($3, $6, __TEMPORAL_JOIN_LEFT_KEY($0), __TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
: +- LogicalTableScan(table=[[default_catalog, default_database, table_source]])
+- LogicalSnapshot(period=[$cor0.rowtime])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, table_side]])

optimize predicate_pushdown cost 89 ms.
optimize result:
LogicalSink(table=[default_catalog.default_database.table_sink], fields=[name, channel, pv, rowtime, info])
+- LogicalProject(inputs=[0], exprs=[[CAST(_UTF-16LE'channel1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE", $2, $3, $5]])
+- LogicalJoin(condition=[AND(=($0, $4), __INITIAL_TEMPORAL_JOIN_CONDITION($3, $6, __TEMPORAL_JOIN_LEFT_KEY($0), __TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner])
:- LogicalFilter(condition=[AND(=($1, _UTF-16LE'channel1'), >($2, 0))])
: +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
: +- LogicalTableScan(table=[[default_catalog, default_database, table_source]])
+- LogicalSnapshot(period=[$cor0.rowtime])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, table_side]])

optimize project_rewrite cost 50 ms.
optimize result:
LogicalSink(table=[default_catalog.default_database.table_sink], fields=[name, channel, pv, rowtime, info])
+- LogicalProject(inputs=[0], exprs=[[CAST(_UTF-16LE'channel1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE", $1, $2, $4]])
+- LogicalJoin(condition=[AND(=($0, $3), __INITIAL_TEMPORAL_JOIN_CONDITION($2, $5, __TEMPORAL_JOIN_LEFT_KEY($0), __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[inner])
:- LogicalProject(inputs=[0], exprs=[[$2, $3]])
: +- LogicalFilter(condition=[AND(=($1, _UTF-16LE'channel1'), >($2, 0))])
: +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
: +- LogicalTableScan(table=[[default_catalog, default_database, table_source]])
+- LogicalSnapshot(period=[$cor0.rowtime])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, table_side]])

optimize logical cost 1356 ms.
optimize result:
FlinkLogicalSink(table=[default_catalog.default_database.table_sink], fields=[name, channel, pv, rowtime, info])
+- FlinkLogicalCalc(select=[name, CAST(_UTF-16LE'channel1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS channel, pv, rowtime, info])
+- FlinkLogicalJoin(condition=[AND(=($0, $3), __INITIAL_TEMPORAL_JOIN_CONDITION($2, $5, __TEMPORAL_JOIN_LEFT_KEY($0), __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[inner])
:- FlinkLogicalCalc(select=[name, pv, rowtime], where=[AND(=(channel, _UTF-16LE'channel1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(pv, 0))])
: +- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, table_source]], fields=[name, channel, pv, rowtime])
+- FlinkLogicalSnapshot(period=[$cor0.rowtime])
+- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, table_side]], fields=[name, info, rowtime])

optimize logical_rewrite cost 178 ms.
optimize result:
FlinkLogicalSink(table=[default_catalog.default_database.table_sink], fields=[name, channel, pv, rowtime, info])
+- FlinkLogicalCalc(select=[name, CAST(_UTF-16LE'channel1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS channel, pv, rowtime, info])
+- FlinkLogicalJoin(condition=[AND(=($0, $3), __TEMPORAL_JOIN_CONDITION($2, $5, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY($3), __TEMPORAL_JOIN_LEFT_KEY($0), __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[inner])
:- FlinkLogicalCalc(select=[name, pv, rowtime], where=[AND(=(channel, _UTF-16LE'channel1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(pv, 0))])
: +- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, table_source]], fields=[name, channel, pv, rowtime])
+- FlinkLogicalSnapshot(period=[$cor0.rowtime])
+- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, table_side]], fields=[name, info, rowtime])

optimize time_indicator cost 33 ms.
optimize result:
FlinkLogicalSink(table=[default_catalog.default_database.table_sink], fields=[name, channel, pv, rowtime, info])
+- FlinkLogicalCalc(select=[name, CAST(_UTF-16LE'channel1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS channel, pv, rowtime, info])
+- FlinkLogicalCalc(select=[name, pv, rowtime, name0, info, CAST(rowtime0 AS TIMESTAMP(3)) AS rowtime0])
+- FlinkLogicalJoin(condition=[AND(=($0, $3), __TEMPORAL_JOIN_CONDITION($2, $5, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY($3), __TEMPORAL_JOIN_LEFT_KEY($0), __TEMPORAL_JOIN_RIGHT_KEY($3)))], joinType=[inner])
:- FlinkLogicalCalc(select=[name, pv, rowtime], where=[AND(=(channel, _UTF-16LE'channel1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(pv, 0))])
: +- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, table_source]], fields=[name, channel, pv, rowtime])
+- FlinkLogicalSnapshot(period=[$cor0.rowtime])
+- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$2])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, table_side]], fields=[name, info, rowtime])

optimize physical cost 899 ms.
optimize result:
Sink(table=[default_catalog.default_database.table_sink], fields=[name, channel, pv, rowtime, info])
+- Calc(select=[name, CAST(_UTF-16LE'channel1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS channel, pv, rowtime, info])
+- TemporalJoin(joinType=[InnerJoin], where=[AND(=(name, name0), __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(name0), __TEMPORAL_JOIN_LEFT_KEY(name), __TEMPORAL_JOIN_RIGHT_KEY(name0)))], select=[name, pv, rowtime, name0, info, rowtime0])
:- Exchange(distribution=[hash[name]])
: +- Calc(select=[name, pv, rowtime], where=[AND(=(channel, _UTF-16LE'channel1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(pv, 0))])
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
: +- TableSourceScan(table=[[default_catalog, default_database, table_source]], fields=[name, channel, pv, rowtime])
+- Exchange(distribution=[hash[name]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+- ChangelogNormalize(key=[name])
+- Exchange(distribution=[hash[name]])
+- TableSourceScan(table=[[default_catalog, default_database, table_side]], fields=[name, info, rowtime])

optimize physical_rewrite cost 87 ms.
optimize result:
Sink(table=[default_catalog.default_database.table_sink], fields=[name, channel, pv, rowtime, info])
+- Calc(select=[name, CAST(_UTF-16LE'channel1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS channel, pv, rowtime, info])
+- TemporalJoin(joinType=[InnerJoin], where=[AND(=(name, name0), __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(name0), __TEMPORAL_JOIN_LEFT_KEY(name), __TEMPORAL_JOIN_RIGHT_KEY(name0)))], select=[name, pv, rowtime, name0, info, rowtime0])
:- Exchange(distribution=[hash[name]])
: +- Calc(select=[name, pv, rowtime], where=[AND(=(channel, _UTF-16LE'channel1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(pv, 0))])
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
: +- TableSourceScan(table=[[default_catalog, default_database, table_source]], fields=[name, channel, pv, rowtime])
+- Exchange(distribution=[hash[name]])
+- ChangelogNormalize(key=[name])
+- Exchange(distribution=[hash[name]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+- TableSourceScan(table=[[default_catalog, default_database, table_side]], fields=[name, info, rowtime])

subquery_rewrite

subquery_rewrite 前后对比:

temporal_join_rewrite

temporal_join_rewrite 前后对比:

decorrelate

decorrelate 前后对比:

default_rewrite

default_rewrite 前后对比:

predicate_pushdown

predicate_pushdown 前后对比:

project_rewrite

project_rewrite 前后对比:

logical

logical 前后对比:

logical_rewrite

logical_rewrite 前后对比:

time_indicator

time_indicator 前后对比:

physical

physical 前后对比:

physical_rewrite

physical_rewrite 前后对比:

参考

Flink 源码阅读笔记 - Flink SQL 整体执行框架
Flink Sql 之 Calcite Volcano 优化器(源码解析)
SQL 查询优化原理之 Volcano Optimizer 介绍
SQL 解析框架 - Calcite
FlinkSQL 生成 StreamGraph