Flink DDL SQL with Calcite

Flink SQL 使用 Apache Calcite 并对其扩展以支持 SQL 语句的解析与验证。
目前 Calcite 流处理语句已实现对 SELECT, WHERE, GROUP BY, HAVING, UNION ALL, ORDER BY 以及 FLOOR, CEIL 函数的支持。
本文将以代码的形式说明 Flink SQL 是如何解析 DDL 语句,javacc 与 java 语言之间的转换关系。

Flink SQL 解析的代码参考:Flink SQL 解析

工程结构

sql-parser 的工程结构,如图所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>${calcite.version}</version>
</dependency>

<build>
<plugins>
<plugin>
<!-- Extract parser grammar template from calcite-core.jar and put
it under ${project.build.directory} where all freemarker templates are. -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>unpack-parser-template</id>
<phase>initialize</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/</outputDirectory>
<includes>**/Parser.jj</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<!-- adding fmpp code gen -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-fmpp-resources</id>
<phase>initialize</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/codegen</outputDirectory>
<resources>
<resource>
<directory>src/main/codegen</directory>
<filtering>false</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.googlecode.fmpp-maven-plugin</groupId>
<artifactId>fmpp-maven-plugin</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.28</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>generate-fmpp-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
<outputDirectory>target/generated-sources</outputDirectory>
<templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<!-- This must be run AFTER the fmpp-maven-plugin -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<!-- This must be kept synced with Apache Calcite. -->
<lookAhead>1</lookAhead>
<isStatic>false</isStatic>
<outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
</configuration>
</plugin>
</plugins>
</build>

config.mpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#
# Calcite's parser grammar file (Parser.jj) is written in javacc
# (https://javacc.org/) with Freemarker (http://freemarker.org/) variables
# to allow clients to:
# 1. have custom parser implementation class and package name.
# 2. insert new parser method implementations written in javacc to parse
# custom:
# a) SQL statements.
# b) literals.
# c) data types.
# 3. add new keywords to support custom SQL constructs added as part of (2).
# 4. add import statements needed by inserted custom parser implementations.
#
# Parser template file (Parser.jj) along with this file are packaged as
# part of the calcite-core-<version>.jar under "codegen" directory.

# calcite模板配置
data: {
# calcite模板配置,引入parser模板文件
parser: tdd(../data/Parser.tdd)
}

# freemarker模板的位置
freemarkerLinks: {
includes: includes/
}

Parser.tdd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
{
# Generated parser implementation package and class name.
# 生成解析器实现类包和名称
# 包名
package: "com.matty.flink.sql.parser.impl",
# 实体类名
class: "SqlParserImpl",

# List of additional classes and packages to import.
# Example. "org.apache.calcite.sql.*", "java.util.List".
# 导入处理语句
imports: [
"com.matty.flink.sql.parser.ddl.SqlCreateTable",
"com.matty.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext",
"com.matty.flink.sql.parser.ddl.SqlTableColumn",
"com.matty.flink.sql.parser.ddl.SqlTableOption",
"com.matty.flink.sql.parser.ddl.SqlCreateView",
"com.matty.flink.sql.parser.ddl.SqlCreateFunction",
"com.matty.flink.sql.parser.ddl.FunctionType",
"com.matty.flink.sql.parser.dml.RichSqlInsert",
"com.matty.flink.sql.parser.dml.RichSqlInsertKeyword",
"com.matty.flink.sql.parser.type.SqlArrayType",
"com.matty.flink.sql.parser.type.SqlBytesType",
"com.matty.flink.sql.parser.type.SqlMapType",
"com.matty.flink.sql.parser.type.SqlMultisetType",
"com.matty.flink.sql.parser.type.SqlRowType",
"com.matty.flink.sql.parser.type.SqlStringType",
"com.matty.flink.sql.parser.type.SqlTimestampType",
"com.matty.flink.sql.parser.type.SqlTimeType",
"com.matty.flink.sql.parser.validate.FlinkSqlConformance",
"com.matty.flink.sql.parser.FlinkSqlDataTypeSpec",
"org.apache.calcite.sql.SqlDrop",
"org.apache.calcite.sql.SqlCreate",
"java.util.List",
"java.util.ArrayList"
]

# List of new keywords. Example: "DATABASES", "TABLES". If the keyword is not a reserved
# keyword, please also add it to 'nonReservedKeywords' section.
# 新增的关键字列表。如果关键字不是一个保留关键字,将其添加到"无保留关键字"部分。
keywords: [
"COMMENT",
"PARTITIONED",
"IF",
"WATERMARK",
"withOffset"
"ASCENDING",
"FROM_SOURCE",
"BOUNDED",
"DELAY",
"OVERWRITE",
"STRING",
"BYTES",
"offset",
"reset",
"SCALA"
]

# List of keywords from "keywords" section that are not reserved.
# "keywords"部分中未保留的关键字列表。
nonReservedKeywords: [
"A"
"ABSENT"
"ABSOLUTE"
"ACTION"
"ADA"
"ADD"
"ADMIN"
"AFTER"
"ALWAYS"
"APPLY"
"ASC"
"ASSERTION"
"ASSIGNMENT"
"ATTRIBUTE"
"ATTRIBUTES"
"BEFORE"
"BERNOULLI"
"BREADTH"
"C"
"CASCADE"
"CATALOG"
"CATALOG_NAME"
"CENTURY"
"CHAIN"
"CHARACTER_SET_CATALOG"
"CHARACTER_SET_NAME"
"CHARACTER_SET_SCHEMA"
"CHARACTERISTICS"
"CHARACTERS"
"CLASS_ORIGIN"
"COBOL"
"COLLATION"
"COLLATION_CATALOG"
"COLLATION_NAME"
"COLLATION_SCHEMA"
"COLUMN_NAME"
"COMMAND_FUNCTION"
"COMMAND_FUNCTION_CODE"
"COMMITTED"
"CONDITION_NUMBER"
"CONDITIONAL"
"CONNECTION"
"CONNECTION_NAME"
"CONSTRAINT_CATALOG"
"CONSTRAINT_NAME"
"CONSTRAINT_SCHEMA"
"CONSTRAINTS"
"CONSTRUCTOR"
"CONTINUE"
"CURSOR_NAME"
"DATA"
"DATABASE"
"DATETIME_INTERVAL_CODE"
"DATETIME_INTERVAL_PRECISION"
"DECADE"
"DEFAULTS"
"DEFERRABLE"
"DEFERRED"
"DEFINED"
"DEFINER"
"DEGREE"
"DEPTH"
"DERIVED"
"DESC"
"DESCRIPTION"
"DESCRIPTOR"
"DIAGNOSTICS"
"DISPATCH"
"DOMAIN"
"DOW"
"DOY"
"DYNAMIC_FUNCTION"
"DYNAMIC_FUNCTION_CODE"
"ENCODING"
"EPOCH"
"ERROR"
"EXCEPTION"
"EXCLUDE"
"EXCLUDING"
"FINAL"
"FIRST"
"FOLLOWING"
"FORMAT"
"FORTRAN"
"FOUND"
"FRAC_SECOND"
"G"
"GENERAL"
"GENERATED"
"GEOMETRY"
"GO"
"GOTO"
"GRANTED"
"HIERARCHY"
"IGNORE"
"IMMEDIATE"
"IMMEDIATELY"
"IMPLEMENTATION"
"INCLUDING"
"INCREMENT"
"INITIALLY"
"INPUT"
"INSTANCE"
"INSTANTIABLE"
"INVOKER"
"ISODOW"
"ISOYEAR"
"ISOLATION"
"JAVA"
"JSON"
"K"
"KEY"
"KEY_MEMBER"
"KEY_TYPE"
"LABEL"
"LAST"
"LENGTH"
"LEVEL"
"LIBRARY"
"LOCATOR"
"M"
"MATCHED"
"MAXVALUE"
"MICROSECOND"
"MESSAGE_LENGTH"
"MESSAGE_OCTET_LENGTH"
"MESSAGE_TEXT"
"MILLISECOND"
"MILLENNIUM"
"MINVALUE"
"MORE_"
"MUMPS"
"NAME"
"NAMES"
"NANOSECOND"
"NESTING"
"NORMALIZED"
"NULLABLE"
"NULLS"
"NUMBER"
"OBJECT"
"OCTETS"
"OPTION"
"OPTIONS"
"ORDERING"
"ORDINALITY"
"OTHERS"
"OUTPUT"
"OVERRIDING"
"PAD"
"PARAMETER_MODE"
"PARAMETER_NAME"
"PARAMETER_ORDINAL_POSITION"
"PARAMETER_SPECIFIC_CATALOG"
"PARAMETER_SPECIFIC_NAME"
"PARAMETER_SPECIFIC_SCHEMA"
"PARTIAL"
"PASCAL"
"PASSING"
"PASSTHROUGH"
"PAST"
"PATH"
"PLACING"
"PLAN"
"PLI"
"PRECEDING"
"PRESERVE"
"PRIOR"
"PRIVILEGES"
"PUBLIC"
"QUARTER"
"READ"
"RELATIVE"
"REPEATABLE"
"REPLACE"
"RESPECT"
"RESTART"
"RESTRICT"
"RETURNED_CARDINALITY"
"RETURNED_LENGTH"
"RETURNED_OCTET_LENGTH"
"RETURNED_SQLSTATE"
"RETURNING"
"ROLE"
"ROUTINE"
"ROUTINE_CATALOG"
"ROUTINE_NAME"
"ROUTINE_SCHEMA"
"ROW_COUNT"
"SCALAR"
"SCALE"
"SCHEMA"
"SCHEMA_NAME"
"SCOPE_CATALOGS"
"SCOPE_NAME"
"SCOPE_SCHEMA"
"SECTION"
"SECURITY"
"SELF"
"SEQUENCE"
"SERIALIZABLE"
"SERVER"
"SERVER_NAME"
"SESSION"
"SETS"
"SIMPLE"
"SIZE"
"SOURCE"
"SPACE"
"SPECIFIC_NAME"
"SQL_BIGINT"
"SQL_BINARY"
"SQL_BIT"
"SQL_BLOB"
"SQL_BOOLEAN"
"SQL_CHAR"
"SQL_CLOB"
"SQL_DATE"
"SQL_DECIMAL"
"SQL_DOUBLE"
"SQL_FLOAT"
"SQL_INTEGER"
"SQL_INTERVAL_DAY"
"SQL_INTERVAL_DAY_TO_HOUR"
"SQL_INTERVAL_DAY_TO_MINUTE"
"SQL_INTERVAL_DAY_TO_SECOND"
"SQL_INTERVAL_HOUR"
"SQL_INTERVAL_HOUR_TO_MINUTE"
"SQL_INTERVAL_HOUR_TO_SECOND"
"SQL_INTERVAL_MINUTE"
"SQL_INTERVAL_MINUTE_TO_SECOND"
"SQL_INTERVAL_MONTH"
"SQL_INTERVAL_SECOND"
"SQL_INTERVAL_YEAR"
"SQL_INTERVAL_YEAR_TO_MONTH"
"SQL_LONGVARBINARY"
"SQL_LONGVARNCHAR"
"SQL_LONGVARCHAR"
"SQL_NCHAR"
"SQL_NCLOB"
"SQL_NUMERIC"
"SQL_NVARCHAR"
"SQL_REAL"
"SQL_SMALLINT"
"SQL_TIME"
"SQL_TIMESTAMP"
"SQL_TINYINT"
"SQL_TSI_DAY"
"SQL_TSI_FRAC_SECOND"
"SQL_TSI_HOUR"
"SQL_TSI_MICROSECOND"
"SQL_TSI_MINUTE"
"SQL_TSI_MONTH"
"SQL_TSI_QUARTER"
"SQL_TSI_SECOND"
"SQL_TSI_WEEK"
"SQL_TSI_YEAR"
"SQL_VARBINARY"
"SQL_VARCHAR"
"STATE"
"STATEMENT"
"STRUCTURE"
"STYLE"
"SUBCLASS_ORIGIN"
"SUBSTITUTE"
"TABLE_NAME"
"TEMPORARY"
"TIES"
"TIMESTAMPADD"
"TIMESTAMPDIFF"
"TOP_LEVEL_COUNT"
"TRANSACTION"
"TRANSACTIONS_ACTIVE"
"TRANSACTIONS_COMMITTED"
"TRANSACTIONS_ROLLED_BACK"
"TRANSFORM"
"TRANSFORMS"
"TRIGGER_CATALOG"
"TRIGGER_NAME"
"TRIGGER_SCHEMA"
"TYPE"
"UNBOUNDED"
"UNCOMMITTED"
"UNCONDITIONAL"
"UNDER"
"UNNAMED"
"USAGE"
"USER_DEFINED_TYPE_CATALOG"
"USER_DEFINED_TYPE_CODE"
"USER_DEFINED_TYPE_NAME"
"USER_DEFINED_TYPE_SCHEMA"
"UTF8"
"UTF16"
"UTF32"
"VERSION"
"VIEW"
"WEEK"
"WRAPPER"
"WORK"
"WRITE"
"XML"
"ZONE",

# not in core, added in Flink
"PARTITIONED",
"IF",
"ASCENDING",
"FROM_SOURCE",
"BOUNDED",
"DELAY",
"OVERWRITE",
"offset",
"reset"
]

# List of methods for parsing custom SQL statements.
# Return type of method implementation should be 'SqlNode'.
# Example: SqlShowDatabases(), SqlShowTables().
# 用于解析自定义SQL语句的方法列表
statementParserMethods: [
"RichSqlInsert()"
]

# List of methods for parsing custom literals.
# Return type of method implementation should be "SqlNode".
# Example: ParseJsonLiteral().
# 解析自定义文本的方法列表
literalParserMethods: [
]

# List of methods for parsing ddl supported data types.
# Return type of method implementation should be "SqlIdentifier".
# Example: SqlParseTimeStampZ().
flinkDataTypeParserMethods: [
"SqlArrayType()",
"SqlMultisetType()",
"SqlMapType()",
"SqlRowType()",
"SqlStringType()",
"SqlBytesType()",
"SqlTimestampType()",
"SqlTimeType()"
]

# List of methods for parsing custom data types.
# Return type of method implementation should be "SqlIdentifier".
# Example: SqlParseTimeStampZ().
# 解析自定义数据类型的方法列表。
dataTypeParserMethods: [
]

# List of methods for parsing builtin function calls.
# Return type of method implementation should be "SqlNode".
# Example: DateFunctionCall().
builtinFunctionCallMethods: [
]

# List of methods for parsing extensions to "ALTER <scope>" calls.
# Each must accept arguments "(SqlParserPos pos, String scope)".
# Example: "SqlUploadJarNode"
# 解析扩展到"Alter"调用的方法。
# 每个都必须接受参数 "(SqlParserPos pos, String scope)"
alterStatementParserMethods: [
]

# List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
# Each must accept arguments "(SqlParserPos pos, boolean replace)".
# 解析扩展以"CREATE [OR REPLACE]"调用的方法列表。
# 每个都必须接受参数 "(SqlParserPos pos, boolean replace)"
createStatementParserMethods: [
"SqlCreateTable",
"SqlCreateView",
"SqlCreateFunction"
]

# List of methods for parsing extensions to "DROP" calls.
# Each must accept arguments "(Span s)".
# 解析扩展到"DROP"调用的方法列表。
# 每个都必须接受参数 "(SqlParserPos pos)"
dropStatementParserMethods: [
]

# List of files in @includes directory that have parser method
# implementations for parsing custom SQL statements, literals or types
# given as part of "statementParserMethods", "literalParserMethods" or
# "dataTypeParserMethods".
# @includes目录中具有解析器方法的文件列表
# 解析自定义SQL语句、文本或类型的实现
# 作为“statementParserMethods”、“literalParserMethods”或“dataTypeParserMethods”的一部分给出。
implementationFiles: [
"parserImpls.ftl"
]

# List of additional join types. Each is a method with no arguments.
# Example: LeftSemiJoin()
# 其他连接类型的列表。每个方法都是没有参数的方法。
joinTypes: [
]

includeCompoundIdentifier: true
includeBraces: true
includeAdditionalDeclarations: false
}

以下结合 Parser.tdd 文件和 calcite-core jar包中打进去的 Parser.jj文件结合说明。

packge,class,imports

主要是负责导入包,设置编译package目录,编译的类名:

1
2
3
4
5
6
7
PARSER_BEGIN(${parser.class})

package ${parser.package};

<#list parser.imports as importStr>
import ${importStr};
</#list>

keywords

定义关键字:

1
2
3
4
5
6
7
8
<DEFAULT, DQID, BTID> TOKEN :
{
...
<#-- additional parser keywords are included here -->
<#list parser.keywords as keyword>
| < ${keyword}: "${keyword}" >
</#list>
}

nonReservedKeywords

keywords定义关键字中,非保留的关键字:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
String NonReservedKeyWord() :
{
}
{
(
NonReservedKeyWord0of3()
| NonReservedKeyWord1of3()
| NonReservedKeyWord2of3()
)
{
return unquotedIdentifier();
}
}

/** @see #NonReservedKeyWord */
void NonReservedKeyWord0of3() :
{
}
{
(
<#list parser.nonReservedKeywords as keyword>
<#if keyword?index == 0>
<${keyword}>
<#elseif keyword?index % 3 == 0>
| <${keyword}>
</#if>
</#list>
)
}

/** @see #NonReservedKeyWord */
void NonReservedKeyWord1of3() :
{
}
{
(
<#list parser.nonReservedKeywords as keyword>
<#if keyword?index == 1>
<${keyword}>
<#elseif keyword?index % 3 == 1>
| <${keyword}>
</#if>
</#list>
)
}

/** @see #NonReservedKeyWord */
void NonReservedKeyWord2of3() :
{
}
{
(
<#list parser.nonReservedKeywords as keyword>
<#if keyword?index == 2>
<${keyword}>
<#elseif keyword?index % 3 == 2>
| <${keyword}>
</#if>
</#list>
)
}

joinTypes

join类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
SqlLiteral JoinType() :
{
JoinType joinType;
}
{
(
<JOIN> { joinType = JoinType.INNER; }
|
<INNER> <JOIN> { joinType = JoinType.INNER; }
|
<LEFT> [ <OUTER> ] <JOIN> { joinType = JoinType.LEFT; }
|
<RIGHT> [ <OUTER> ] <JOIN> { joinType = JoinType.RIGHT; }
|
<FULL> [ <OUTER> ] <JOIN> { joinType = JoinType.FULL; }
|
<CROSS> <JOIN> { joinType = JoinType.CROSS; }
<#list parser.joinTypes as method>
|
joinType = ${method}()
</#list>
)
{
return joinType.symbol(getPos());
}
}

statementParserMethods

解析自定义SQL语句的方法列表,必须继承SqlNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Parses an SQL statement.
*/
SqlNode SqlStmt() :
{
SqlNode stmt;
}
{
(
<#-- Add methods to parse additional statements here -->
<#list parser.statementParserMethods as method>
LOOKAHEAD(2) stmt = ${method}
|
</#list>
stmt = SqlSetOption(Span.of(), null)
|
stmt = SqlAlter()
|
<#if parser.createStatementParserMethods?size != 0>
stmt = SqlCreate()
|
</#if>
<#if parser.dropStatementParserMethods?size != 0>
stmt = SqlDrop()
|
</#if>
stmt = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
|
stmt = SqlExplain()
|
stmt = SqlDescribe()
|
stmt = SqlInsert()
|
stmt = SqlDelete()
|
stmt = SqlUpdate()
|
stmt = SqlMerge()
|
stmt = SqlProcedureCall()
)
{
return stmt;
}
}

literalParserMethods

解析自定义文字的方法列表,必须实现SqlNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
SqlNode Literal() :
{
SqlNode e;
}
{
(
e = NumericLiteral()
|
e = StringLiteral()
|
e = SpecialLiteral()
|
e = DateTimeLiteral()
|
e = IntervalLiteral()
<#-- additional literal parser methods are included here -->
<#list parser.literalParserMethods as method>
|
e = ${method}
</#list>
)
{
return e;
}


}

dataTypeParserMethods

解析自定义数据类型的方法列表,必须实现SqlIdentifier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Some SQL type names need special handling due to the fact that they have
// spaces in them but are not quoted.
SqlIdentifier TypeName() :
{
final SqlTypeName sqlTypeName;
final SqlIdentifier typeName;
final Span s = Span.of();
}
{
(
LOOKAHEAD(2)
sqlTypeName = SqlTypeName(s) {
typeName = new SqlIdentifier(sqlTypeName.name(), s.end(this));
}
<#-- additional types are included here -->
<#list parser.dataTypeParserMethods as method>
|
typeName = ${method}
</#list>
|
typeName = CollectionsTypeName()
|
typeName = CompoundIdentifier()
)
{
return typeName;
}
}

alterStatementParserMethods

解析自定义alter语句,必须有构造方法 (SqlParserPos pos, String scope)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Parses an expression for setting or resetting an option in SQL, such as QUOTED_IDENTIFIERS,
* or explain plan level (physical/logical).
*/
SqlAlter SqlAlter() :
{
final Span s;
final String scope;
final SqlAlter alterNode;
}
{
<ALTER> { s = span(); }
scope = Scope()
(
<#-- additional literal parser methods are included here -->
<#list parser.alterStatementParserMethods as method>
alterNode = ${method}(s, scope)
|
</#list>

alterNode = SqlSetOption(s, scope)
)
{
return alterNode;
}
}

createStatementParserMethods

解析自定义create语句,必须有构造方法 (SqlParserPos pos, String scope)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<#if parser.createStatementParserMethods?size != 0>
/**
* Parses a CREATE statement.
*/
SqlCreate SqlCreate() :
{
final Span s;
boolean replace = false;
final SqlCreate create;
}
{
<CREATE> { s = span(); }
[
<OR> <REPLACE> {
replace = true;
}
]
(
<#-- additional literal parser methods are included here -->
<#list parser.createStatementParserMethods as method>
create = ${method}(s, replace)
<#sep>|</#sep>
</#list>
)
{
return create;
}
}
</#if>

dropStatementParserMethods

解析自定义drop语句,必须有构造方法(SqlParserPos pos)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<#if parser.dropStatementParserMethods?size != 0>
/**
* Parses a DROP statement.
*/
SqlDrop SqlDrop() :
{
final Span s;
boolean replace = false;
final SqlDrop drop;
}
{
<DROP> { s = span(); }
(
<#-- additional literal parser methods are included here -->
<#list parser.dropStatementParserMethods as method>
drop = ${method}(s, replace)
<#sep>|</#sep>
</#list>
)
{
return drop;
}
}
</#if>

implementationFiles

自定义模板文件:

1
2
3
4
<#-- Add implementations of additional parser statement calls here -->
<#list parser.implementationFiles as file>
<#include "/@includes/"+file />
</#list>

includeCompoundIdentifier

是否包含CompoundIdentifier解析

Parser.jj常用方法

getPos

获取当前token的位置,自定义解析时使用

StringLiteral

解析语句中的string类型的字段,带’’

1
select * from table where a='string';

Identifier

解析Identifier字段,返回string类型

1
select Identifier from table;

可以有SimpleIdentifier,可以是表名、字段名、别名等
或CompoundIdentifier,如要解析kafka.bootstrap.servers这样的复合字符串

以下介绍真正运用在Flink SQL解析的示例:

DDL

CREATE TABLE

自定义SQL语法

自定义SQL DDL语句,创建source表、维表、sink表。下面给出几种常见的建表sql示例:

  • 创建kafka source表,带计算列和watermark的定义

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    CREATE TABLE table_source(
    channel varchar, -- 频道
    pv int, -- 点击次数
    xctime varchar, -- yyyyMMddHHmmss格式时间戳,字符串类型
    ts AS TO_TIMESTAMP(xctime,'yyyyMMddHHmmss'), -- rowtime,必须为Timestamp类型或Long类型
    WATERMARK FOR ts AS withOffset( ts , '120000' ) --watermark计算方法,允许2分钟的乱序时间,即允许数据迟到2分钟
    )WITH(
    type='kafka11',
    kafka.bootstrap.servers='mwt:9092',
    kafka.auto.offset.reset='latest',
    kafka.kerberos.enabled='false',
    kafka.data.type='json',
    kafka.topic='table_source',
    parallelism='1'
    )
  • 创建mysql维表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    CREATE TABLE table_side(
    name varchar comment '名称', -- 允许给字段设置comment
    info varchar comment '详细信息',
    PRIMARY KEY(name), -- 创建维表一定要指定 PRIMARY KEY
    PERIOD FOR SYSTEM_TIME -- 维表标识
    )WITH(
    type ='mysql',
    url ='jdbc:mysql://192.168.1.8:3306/demo?charset=utf8',
    userName ='dipper',
    password ='ide@123',
    tableName ='table_side',
    cache ='NONE',
    parallelism ='1'
    )
  • 创建sink表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    CREATE TABLE table_sink(
    name varchar,
    channel varchar,
    pv int,
    xctime bigint,
    info varchar
    )WITH(
    type ='console',
    parallelism='1'
    );

语句执行流程

1
2
3
4
5
6
7

org.apache.flink.table.api.internal.TableEnvironmentImpl#executeSql
org.apache.flink.table.planner.delegation.ParserImpl#parse
org.apache.flink.table.planner.operations.SqlCreateTableConverter#convertCreateTable:返回CreateTableOperation包含了CatalogTable信息
org.apache.flink.table.api.internal.TableEnvironmentImpl#executeOperation
org.apache.flink.table.catalog.CatalogManager#createTable
org.apache.flink.table.catalog.GenericInMemoryCatalog#createTable

定义解析结果类

SqlCreateTable.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package com.matty.flink.sql.parser.ddl;

import com.matty.flink.sql.parser.ExtendedSqlNode;
import com.matty.flink.sql.parser.exception.SqlParseException;
import lombok.Data;
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.pretty.SqlPrettyWriter;
import org.apache.calcite.util.ImmutableNullableList;
import org.apache.calcite.util.NlsString;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static java.util.Objects.requireNonNull;

/**
* Description:
*
* @author mwt
* @version 1.0
* @date 2019-10-08
*/
@Data
public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {

public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE);

private final SqlIdentifier tableName;

private final SqlNodeList columnList;

private final SqlNodeList propertyList;

private final SqlNodeList primaryKeyList;

private final SqlCharStringLiteral comment;

private final boolean sideFlag;

private SqlIdentifier eventTimeField;

private SqlNode maxOutOrderless;

public SqlCreateTable(
SqlParserPos pos,
SqlIdentifier tableName,
SqlNodeList columnList,
SqlNodeList primaryKeyList,
SqlNodeList propertyList,
SqlCharStringLiteral comment,
boolean sideFlag,
SqlIdentifier eventTimeField,
SqlNode maxOutOrderless) {
super(OPERATOR, pos, false, false);
this.tableName = requireNonNull(tableName, "Table name is missing");
this.columnList = requireNonNull(columnList, "Column list should not be null");
this.primaryKeyList = primaryKeyList;
this.propertyList = propertyList;
this.comment = comment;
this.sideFlag = sideFlag;
this.eventTimeField = eventTimeField;
this.maxOutOrderless = maxOutOrderless;
}

@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(tableName, columnList, primaryKeyList,
propertyList, comment);
}

public Long getMaxOutOrderless() {
return Long.parseLong(((NlsString) SqlLiteral.value(maxOutOrderless)).getValue());
}


@Override
public void validate() throws SqlParseException {
Set<String> columnNames = new HashSet<>();
if (columnList != null) {
for (SqlNode column : columnList) {
String columnName = null;
if (column instanceof SqlTableColumn) {
SqlTableColumn tableColumn = (SqlTableColumn) column;
columnName = tableColumn.getName().getSimple();
if (tableColumn.getAlias() != null) {
columnName = tableColumn.getAlias().getSimple();
}
String typeName = tableColumn.getType().getTypeName().getSimple();
if (SqlColumnType.getType(typeName).isUnsupported()) {
throw new SqlParseException(
column.getParserPosition(),
"Not support type [" + typeName + "], at " + column.getParserPosition());
}
} else if (column instanceof SqlBasicCall) {
SqlBasicCall tableColumn = (SqlBasicCall) column;
columnName = tableColumn.getOperands()[1].toString();
}

if (!columnNames.add(columnName)) {
throw new SqlParseException(
column.getParserPosition(),
"Duplicate column name [" + columnName + "], at " +
column.getParserPosition());
}
}
}

if (this.primaryKeyList != null) {
for (SqlNode primaryKeyNode : this.primaryKeyList) {
String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple();
if (!columnNames.contains(primaryKey)) {
throw new SqlParseException(
primaryKeyNode.getParserPosition(),
"Primary key [" + primaryKey + "] not defined in columns, at " +
primaryKeyNode.getParserPosition());
}
}
}

}

public boolean containsComputedColumn() {
for (SqlNode column : columnList) {
if (column instanceof SqlBasicCall) {
return true;
}
}
return false;
}

/**
* Returns the projection format of the DDL columns(including computed columns).
* e.g. If we got a DDL:
* <pre>
* create table tbl1(
* col1 int,
* col2 varchar,
* col3 as to_timestamp(col2)
* ) with (
* 'connector' = 'csv'
* )
* </pre>
* we would return a query like:
*
* <p>"col1, col2, to_timestamp(col2) as col3", caution that the "computed column" operands
* have been reversed.
*/
public String getColumnSqlString() {
SqlPrettyWriter writer = new SqlPrettyWriter(AnsiSqlDialect.DEFAULT);
writer.setAlwaysUseParentheses(true);
writer.setSelectListItemsOnSeparateLines(false);
writer.setIndentation(0);
writer.startList("", "");
for (SqlNode column : columnList) {
writer.sep(",");
if (column instanceof SqlTableColumn) {
SqlTableColumn tableColumn = (SqlTableColumn) column;
tableColumn.getName().unparse(writer, 0, 0);
} else {
column.unparse(writer, 0, 0);
}
}

return writer.toString();
}

@Override
public void unparse(
SqlWriter writer,
int leftPrec,
int rightPrec) {
writer.keyword("CREATE TABLE");
tableName.unparse(writer, leftPrec, rightPrec);
SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
for (SqlNode column : columnList) {
printIndent(writer);
if (column instanceof SqlBasicCall) {
SqlCall call = (SqlCall) column;
SqlCall newCall = call.getOperator().createCall(
SqlParserPos.ZERO,
call.operand(1),
call.operand(0));
newCall.unparse(writer, leftPrec, rightPrec);
} else {
column.unparse(writer, leftPrec, rightPrec);
}
}

if (primaryKeyList != null && primaryKeyList.size() > 0) {
printIndent(writer);
writer.keyword("PRIMARY KEY");
SqlWriter.Frame keyFrame = writer.startList("(", ")");
primaryKeyList.unparse(writer, leftPrec, rightPrec);
writer.endList(keyFrame);
}

if (sideFlag) {
printIndent(writer);
writer.keyword("PERIOD FOR SYSTEM_TIME");
}

if (eventTimeField != null) {
printIndent(writer);
writer.keyword("WATERMARK FOR ");
eventTimeField.unparse(writer, leftPrec, rightPrec);
writer.keyword("AS withOffset");
SqlWriter.Frame offsetFrame = writer.startList("(", ")");
eventTimeField.unparse(writer, leftPrec, rightPrec);
writer.keyword(",");
maxOutOrderless.unparse(writer, leftPrec, rightPrec);
writer.endList(offsetFrame);
}

writer.newlineAndIndent();
writer.endList(frame);

if (comment != null) {
writer.newlineAndIndent();
writer.keyword("COMMENT");
comment.unparse(writer, leftPrec, rightPrec);
}

if (propertyList != null) {
writer.keyword("WITH");
SqlWriter.Frame withFrame = writer.startList("(", ")");
for (SqlNode property : propertyList) {
printIndent(writer);
property.unparse(writer, leftPrec, rightPrec);
}
writer.newlineAndIndent();
writer.endList(withFrame);
}
}

private void printIndent(SqlWriter writer) {
writer.sep(",", false);
writer.newlineAndIndent();
writer.print(" ");
}

/**
* Table creation context.
*/
public static class TableCreationContext {
public List<SqlNode> columnList = new ArrayList<>();
public SqlNodeList primaryKeyList;
public boolean sideFlag;
public SqlIdentifier eventTimeField;
public SqlNode maxOutOrderless;
}

public String[] fullTableName() {
return tableName.names.toArray(new String[0]);
}
}
SqlTableColumn.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Data
public class SqlTableColumn extends SqlCall {
private static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("COLUMN_DECL", SqlKind.COLUMN_DECL);

private SqlIdentifier name;
private SqlDataTypeSpec type;
private SqlIdentifier alias;
private SqlCharStringLiteral comment;

public SqlTableColumn(SqlIdentifier name,
SqlDataTypeSpec type,
SqlIdentifier alias,
SqlCharStringLiteral comment,
SqlParserPos pos) {
super(pos);
this.name = requireNonNull(name, "Column name should not be null");
this.type = requireNonNull(type, "Column type should not be null");
this.alias = alias;
this.comment = comment;
}

@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(name, type, comment);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
this.name.unparse(writer, leftPrec, rightPrec);
writer.print(" ");
ExtendedSqlType.unparseType(type, writer, leftPrec, rightPrec);
if (this.alias != null) {
writer.print(" AS ");
this.alias.unparse(writer, leftPrec, rightPrec);
}
if (this.comment != null) {
writer.print(" COMMENT ");
this.comment.unparse(writer, leftPrec, rightPrec);
}
}

}

javacc语法模板

SqlCreateTable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* Parse a table creation.
*/
SqlCreate SqlCreateTable(Span s, boolean replace) :
{
final SqlParserPos startPos = s.pos();
SqlIdentifier tableName;
SqlNodeList primaryKeyList = null;
SqlNodeList columnList = SqlNodeList.EMPTY;
SqlCharStringLiteral comment = null;
boolean sideFlag = false;
SqlIdentifier eventTimeField = null;
SqlNode maxOutOrderless = null;

SqlNodeList propertyList = SqlNodeList.EMPTY;
SqlParserPos pos = startPos;
}
{
<TABLE>

tableName = CompoundIdentifier()
[
<LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
TableColumn(ctx)
(
<COMMA> TableColumn(ctx)
)*
{
pos = pos.plus(getPos());
columnList = new SqlNodeList(ctx.columnList, pos);
primaryKeyList = ctx.primaryKeyList;
sideFlag = ctx.sideFlag;
eventTimeField = ctx.eventTimeField;
maxOutOrderless = ctx.maxOutOrderless;
}
<RPAREN>
]

[ <COMMENT> <QUOTED_STRING> {
String p = SqlParserUtil.parseString(token.image);
comment = SqlLiteral.createCharString(p, getPos());
}]
[
<WITH>
propertyList = TableProperties()
]
{
return new SqlCreateTable(startPos.plus(getPos()),
tableName,
columnList,
primaryKeyList,
propertyList,
comment,
sideFlag,
eventTimeField,
maxOutOrderless);
}
}

对应生成的 SqlParserImpl 类中,从 SqlStmt() 方法为切入口:

  1. 首先定位到 token - “CREATE”
  2. 定位到 token - “TABLE”
  3. 定位到 token - “table_source”
  4. 定位到 token - “(“
  5. 解析 “(“ 后设置的多个 TableColumn ,涉及到普通字段、主键、watermark、维表标记解析,设置到 TableCreationContext 中
    单个 column 解析交给 TableColumn(ctx) 方法执行;
  6. 定位到 token - “)”
  7. 定位到 token - “COMMENT”
  8. 定位到 token - “WITH”,交给 TableProperties() 方法处理
    1. 返回 java 对象 SqlCreateTable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
public class SqlParserImpl extends SqlAbstractParserImpl implements SqlParserImplConstants {

/**
* Parses an SQL statement.
*/
final public SqlNode SqlStmt() throws ParseException {
SqlNode stmt;
if (jj_2_4(2)) {
stmt = RichSqlInsert();
} else {
switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
case RESET:
case SET:
stmt = SqlSetOption(Span.of(), null);
break;
case ALTER:
stmt = SqlAlter();
break;

// 1. 首先定位到 token - "CREATE"
case CREATE:
stmt = SqlCreate();
break;
case A:
case ABS:
case ABSENT:
case ABSOLUTE:
...
case UNICODE_QUOTED_IDENTIFIER:
stmt = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY);
break;
case EXPLAIN:
stmt = SqlExplain();
break;
case DESCRIBE:
stmt = SqlDescribe();
break;
case INSERT:
case UPSERT:
stmt = SqlInsert();
break;
case DELETE:
stmt = SqlDelete();
break;
case UPDATE:
stmt = SqlUpdate();
break;
case MERGE:
stmt = SqlMerge();
break;
case CALL:
stmt = SqlProcedureCall();
break;
default:
jj_la1[21] = jj_gen;
jj_consume_token(-1);
throw new ParseException();
}
}
{if (true) return stmt;}
throw new Error("Missing return statement in function");
}


/**
* Parses a CREATE statement.
*/
final public SqlCreate SqlCreate() throws ParseException {
final Span s;
boolean replace = false;
final SqlCreate create;
jj_consume_token(CREATE);
s = span();
switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
case OR:
jj_consume_token(OR);
jj_consume_token(REPLACE);
replace = true;
break;
default:
jj_la1[206] = jj_gen;
;
}
switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {

// 2. 定位到 token - "TABLE"
case TABLE:
create = SqlCreateTable(s, replace);
break;
case VIEW:
create = SqlCreateView(s, replace);
break;
case FUNCTION:
create = SqlCreateFunction(s, replace);
break;
case CEPRULE:
create = SqlCreateCepRule(s, replace);
break;
default:
jj_la1[207] = jj_gen;
jj_consume_token(-1);
throw new ParseException();
}
{if (true) return create;}
throw new Error("Missing return statement in function");
}


/**
* Parse a table creation.
*/
final public SqlCreate SqlCreateTable(Span s, boolean replace) throws ParseException {
final SqlParserPos startPos = s.pos();
SqlIdentifier tableName;
SqlNodeList primaryKeyList = null;
SqlNodeList columnList = SqlNodeList.EMPTY;
SqlCharStringLiteral comment = null;
boolean sideFlag = false;
SqlIdentifier eventTimeField = null;
SqlNode maxOutOrderless = null;

SqlNodeList propertyList = SqlNodeList.EMPTY;
SqlParserPos pos = startPos;
jj_consume_token(TABLE);

// 3. 定位到 token - "table_source"
tableName = CompoundIdentifier();
switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {

// 4. 定位到 token - "("
case LPAREN:
jj_consume_token(LPAREN);
pos = getPos(); TableCreationContext ctx = new TableCreationContext();

// 5. 解析 "(" 后设置的多个 TableColumn ,涉及到普通字段、主键、watermark、维表标记解析,设置到 TableCreationContext 中
// 交给 TableColumn(ctx) 处理
TableColumn(ctx);
label_8:
while (true) {
switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
case COMMA:
;
break;
default:
jj_la1[32] = jj_gen;
break label_8;
}
jj_consume_token(COMMA);
TableColumn(ctx);
}
pos = pos.plus(getPos());
columnList = new SqlNodeList(ctx.columnList, pos);
primaryKeyList = ctx.primaryKeyList;
sideFlag = ctx.sideFlag;
eventTimeField = ctx.eventTimeField;
maxOutOrderless = ctx.maxOutOrderless;

// 6. 定位到 token - ")"
jj_consume_token(RPAREN);
break;
default:
jj_la1[33] = jj_gen;
;
}


switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
// 7. 定位到 token - "COMMENT"
case COMMENT:
jj_consume_token(COMMENT);
jj_consume_token(QUOTED_STRING);
String p = SqlParserUtil.parseString(token.image);
comment = SqlLiteral.createCharString(p, getPos());
break;
default:
jj_la1[34] = jj_gen;
;
}

switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
// 8. 定位到 token - "WITH"
case WITH:
jj_consume_token(WITH);
// 交给 TableProperties() 处理
propertyList = TableProperties();
break;
default:
jj_la1[35] = jj_gen;
;
}
// 9. 返回 java 对象 SqlCreateTable
{if (true) return new SqlCreateTable(startPos.plus(getPos()),
tableName,
columnList,
primaryKeyList,
propertyList,
comment,
sideFlag,
eventTimeField,
maxOutOrderless);}
throw new Error("Missing return statement in function");
}


}
TableColumn

将 TableCreationContext 作为参数传递给 TableColumn,解析得到的结果都会设置到 TableCreationContext 类中。分别会解析字段列、主键信息、计算列、watermark 标识、维表标识。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** 
* 获取表字段列
*/
void TableColumn(TableCreationContext context) :
{
}
{
(LOOKAHEAD(3)
TableColumn2(context.columnList)
|
context.primaryKeyList = PrimaryKey()
|
Watermark(context)
|
ComputedColumn(context)
|
context.sideFlag = SideFlag()
)
}
  • TableColumn2
    解析建表语句的字段值,类型为自定义的 FlinkSqlDataTypeSpec
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void TableColumn2(List<SqlNode> list) :
{
SqlParserPos pos;
SqlIdentifier name;
SqlDataTypeSpec type;
SqlIdentifier alias = null;
SqlCharStringLiteral comment = null;
}
{
name = SimpleIdentifier()
<#-- #FlinkDataType already takes care of the nullable attribute. -->
type = FlinkDataType()
[ <AS> alias = SimpleIdentifier()
]
[ <COMMENT> <QUOTED_STRING> {
String p = SqlParserUtil.parseString(token.image);
comment = SqlLiteral.createCharString(p, getPos());
}]
{
SqlTableColumn tableColumn = new SqlTableColumn(name, type , alias ,comment , getPos());
list.add(tableColumn);
}
}

处理表列,从 TableColumn() 方法为切入口:

  1. 处理普通列字段,交给 TableColumn2(context.columnList) 方法处理
  2. 处理主键字段,交给 PrimaryKey() 方法处理
  3. 处理 watermark 字段,交给 Watermark(context) 方法处理
  4. 处理计算列,交给 ComputedColumn(context) 方法处理
  5. 处理维表标记,交给 SideFlag() 方法处理

处理表的普通列,从 TableColumn2() 方法为切入口:

  1. 获取列名
  2. 解析列类型,FlinkDataType,flink 自定义的数据类型
  3. 读取 token - “AS” 及 alias
  4. 读取字段描述
  5. 返回表列对应的 java 对象 SqlTableColumn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class SqlParserImpl extends SqlAbstractParserImpl implements SqlParserImplConstants {

/**
* 获取表字段列
*/
final public void TableColumn(TableCreationContext context) throws ParseException {
if (jj_2_5(3)) {
// 1. 处理普通列字段,交给 TableColumn2(context.columnList) 方法处理
TableColumn2(context.columnList);
} else {
switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
case PRIMARY:
// 2. 处理主键字段,交给 PrimaryKey 方法处理
context.primaryKeyList = PrimaryKey();
break;
case WATERMARK:
// 3. 处理 watermark 字段,交给 Watermark(context) 方法处理
Watermark(context);
break;
case A:
case ABSENT:
case ABSOLUTE:
...
case IDENTIFIER:
case UNICODE_QUOTED_IDENTIFIER:
// 4. 处理计算列,交给 ComputedColumn(context) 方法处理
ComputedColumn(context);
break;
default:
jj_la1[23] = jj_gen;
// 5. 处理维表标记,交给 SideFlag() 方法处理
context.sideFlag = SideFlag();
}
}
}

final public void TableColumn2(List<SqlNode> list) throws ParseException {
SqlParserPos pos;
SqlIdentifier name;
SqlDataTypeSpec type;
SqlIdentifier alias = null;
SqlCharStringLiteral comment = null;

// 1. 获取列名
name = SimpleIdentifier();

// 2. 解析列类型,FlinkDataType,flink 自定义的数据类型
type = FlinkDataType();


switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
// 3. 读取 token - "AS" 及 alias
case AS:
jj_consume_token(AS);
alias = SimpleIdentifier();
break;
default:
jj_la1[24] = jj_gen;
;
}


switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
// 4. 读取字段描述
case COMMENT:
jj_consume_token(COMMENT);
jj_consume_token(QUOTED_STRING);
String p = SqlParserUtil.parseString(token.image);
comment = SqlLiteral.createCharString(p, getPos());
break;
default:
jj_la1[25] = jj_gen;
;
}
// 5. 返回表列对应的 java 对象 `SqlTableColumn`
SqlTableColumn tableColumn = new SqlTableColumn(name, type , alias ,comment , getPos());
list.add(tableColumn);
}


}
FlinkDataType

Flink Sql有许多自定义类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* Parse a Flink data type with nullable options, NULL -> nullable, NOT NULL -> not nullable.
* Default to be nullable.
*/
SqlDataTypeSpec FlinkDataType() :
{
final SqlIdentifier typeName;
SqlIdentifier collectionTypeName = null;
int scale = -1;
int precision = -1;
String charSetName = null;
final Span s;
boolean nullable = true;
boolean elementNullable = true;
}
{
typeName = FlinkTypeName() {
s = span();
}
[
<LPAREN>
precision = UnsignedIntLiteral()
[
<COMMA>
scale = UnsignedIntLiteral()
]
<RPAREN>
]
elementNullable = NullableOpt()
[
collectionTypeName = FlinkCollectionsTypeName()
nullable = NullableOpt()
]
{
if (null != collectionTypeName) {
return new FlinkSqlDataTypeSpec(
collectionTypeName,
typeName,
precision,
scale,
charSetName,
nullable,
elementNullable,
s.end(collectionTypeName));
}
nullable = elementNullable;
return new FlinkSqlDataTypeSpec(typeName,
precision,
scale,
charSetName,
null,
nullable,
elementNullable,
s.end(this));
}
}
PrimaryKey

解析主键信息,主键字段可能有多个,如 PRIMARY KEY(id,name)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 主键信息
*/
SqlNodeList PrimaryKey() :
{
List<SqlNode> pkList = new ArrayList<SqlNode>();

SqlParserPos pos;
SqlIdentifier columnName;
}
{
<PRIMARY> { pos = getPos(); } <KEY> <LPAREN>
columnName = SimpleIdentifier() { pkList.add(columnName); }
(<COMMA> columnName = SimpleIdentifier() { pkList.add(columnName); })*
<RPAREN>
{
return new SqlNodeList(pkList, pos.plus(getPos()));
}
}

处理主键,从 PrimaryKey() 方法为切入口:

  1. 获取 token - “PRIMARY”
  2. 获取 token - “KEY”
  3. 获取 token - “(“
  4. 获取主键列名称,加入 pkList ,多个列以逗号间隔
  5. 返回一个 SqlNodeList 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class SqlParserImpl extends SqlAbstractParserImpl implements SqlParserImplConstants {

/**
* 主键信息
*/
final public SqlNodeList PrimaryKey() throws ParseException {
List<SqlNode> pkList = new ArrayList<SqlNode>();

SqlParserPos pos;
SqlIdentifier columnName;

// 1. 获取 token - "PRIMARY"
jj_consume_token(PRIMARY);
pos = getPos();

// 2. 获取 token - "KEY"
jj_consume_token(KEY);

// 3. 获取 token - "("
jj_consume_token(LPAREN);

// 4. 获取主键列名称,加入 pkList ,多个列以逗号间隔
columnName = SimpleIdentifier();
pkList.add(columnName);
label_5:
while (true) {
switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
case COMMA:
;
break;
default:
jj_la1[26] = jj_gen;
break label_5;
}
jj_consume_token(COMMA);
columnName = SimpleIdentifier();
pkList.add(columnName);
}
jj_consume_token(RPAREN);
// 5. 返回一个 SqlNodeList 对象
{if (true) return new SqlNodeList(pkList, pos.plus(getPos()));}
throw new Error("Missing return statement in function");
}

}
Watermark

解析watermark,事件时间窗口需要使用 watermark,如:WATERMARK FOR ts AS withOffset( ts , '120000' )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void Watermark(TableCreationContext context) :
{
SqlNode identifier;
SqlNode expr;
boolean hidden = false;
SqlParserPos pos;
SqlIdentifier eventTimeField;
SqlNode maxOutOrderless;
}
{
<WATERMARK> <FOR> eventTimeField = SimpleIdentifier() <AS> <withOffset>
<LPAREN>
eventTimeField = SimpleIdentifier() { context.eventTimeField = eventTimeField; }
<COMMA> maxOutOrderless = StringLiteral() {context.maxOutOrderless = maxOutOrderless; }
<RPAREN>
}

处理 watermark ,从 Watermark(context) 方法为切入口:

  1. 获取 token - “WATERMARK”
  2. 获取 token - “FOR”
  3. 获取 EventTime 字段名称
  4. 获取 token - “AS”
  5. 获取 token - “withOffset”
  6. 获取 token - “(“
  7. 再次获取 EventTime 字段名称
  8. 获取 token - “,”
  9. 获取允许延迟的 ms 值
  10. 获取 token - “)”
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class SqlParserImpl extends SqlAbstractParserImpl implements SqlParserImplConstants {

final public void Watermark(TableCreationContext context) throws ParseException {
SqlNode identifier;
SqlNode expr;
boolean hidden = false;
SqlParserPos pos;
SqlIdentifier eventTimeField;
SqlNode maxOutOrderless;

// 1. 获取 token - "WATERMARK"
jj_consume_token(WATERMARK);

// 2. 获取 token - "FOR"
jj_consume_token(FOR);

// 3. 获取 EventTime 字段名称
eventTimeField = SimpleIdentifier();

// 4. 获取 token - "AS"
jj_consume_token(AS);

// 5. 获取 token - "withOffset"
jj_consume_token(withOffset);

// 6. 获取 token - "("
jj_consume_token(LPAREN);

// 7. 再次获取 EventTime 字段名称
eventTimeField = SimpleIdentifier();
context.eventTimeField = eventTimeField;

// 8. 获取 token - ","
jj_consume_token(COMMA);

// 9. 获取允许延迟的 ms 值
maxOutOrderless = StringLiteral();
context.maxOutOrderless = maxOutOrderless;

// 10. 获取 token - ")"
jj_consume_token(RPAREN);
}

}
ComputedColumn

解析计算列,如:ts AS TO_TIMESTAMP(xctime,'yyyyMMddHHmmss')

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void ComputedColumn(TableCreationContext context) :
{
SqlNode identifier = null;
SqlNode expr;
boolean hidden = false;
SqlParserPos pos;
}
{
identifier = SimpleIdentifier() {pos = getPos();}
<AS>
expr = Expression(ExprContext.ACCEPT_SUB_QUERY) {
expr = SqlStdOperatorTable.AS.createCall(Span.of(identifier, expr).pos(), expr, identifier);
context.columnList.add(expr);
}
}

处理计算列,从 ComputedColumn(context) 方法为切入口:

  1. 获取计算列名
  2. 获取 token - “AS”
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class SqlParserImpl extends SqlAbstractParserImpl implements SqlParserImplConstants {

final public void ComputedColumn(TableCreationContext context) throws ParseException {
SqlNode identifier = null;
SqlNode expr;
boolean hidden = false;
SqlParserPos pos;

// 1. 获取计算列名
identifier = SimpleIdentifier();
pos = getPos();

// 2. 获取 token - "AS"
jj_consume_token(AS);

// 3. 通过 Expression 处理计算列函数
expr = Expression(ExprContext.ACCEPT_SUB_QUERY);
expr = SqlStdOperatorTable.AS.createCall(Span.of(identifier, expr).pos(), expr, identifier);
context.columnList.add(expr);
}

}
SideFlag

解析维表标识,如果为true,说明创建的是维表

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 维表标识
*/
boolean SideFlag() :
{
SqlParserPos pos;
SqlIdentifier columnName;
}
{
<PERIOD> { pos = getPos(); } <FOR> <SYSTEM_TIME> { return true; }
|
{ return false; }
}
TableProperties

解析出WITH参数列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Parse a table properties.
*/
SqlNodeList TableProperties():
{
SqlNode property;
final List<SqlNode> proList = new ArrayList<SqlNode>();
final Span span;
}
{
<LPAREN> { span = span(); }
[
property = TableOption()
{
proList.add(property);
}
(
<COMMA> property = TableOption()
{
proList.add(property);
}
)*
]
<RPAREN>
{ return new SqlNodeList(proList, span.end(this)); }
}

SqlNode TableOption() :
{
SqlIdentifier key;
SqlNode value;
SqlParserPos pos;
}
{
key = CompoundIdentifier()
{ pos = getPos(); }
<EQ> value = StringLiteral()
{
return new SqlTableOption(key, value, getPos());
}
}

可自定义字段类型

  • FlinkSqlDataTypeSpec
SqlBytesType
1
2
3
4
5
6
SqlIdentifier SqlBytesType() :
{
}
{
<BYTES> { return new SqlBytesType(getPos()); }
}
SqlBytesType.javagit 地址
1
2
3
4
5
6
7
8
9
10
public class SqlBytesType extends SqlIdentifier implements ExtendedSqlType{
public SqlBytesType(SqlParserPos pos) {
super("BYTES", pos);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("BYTES");
}
}
SqlStringType
1
2
3
4
5
6
SqlIdentifier SqlStringType() :
{
}
{
<STRING> { return new SqlStringType(getPos()); }
}
SqlStringType.javagit 地址
1
2
3
4
5
6
7
8
9
10
public class SqlStringType extends SqlIdentifier implements ExtendedSqlType {
public SqlStringType(SqlParserPos pos) {
super("STRING", pos);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("STRING");
}
}
SqlArrayType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SqlIdentifier SqlArrayType() :
{
SqlParserPos pos;
SqlDataTypeSpec elementType;
boolean nullable = true;
}
{
<ARRAY> { pos = getPos(); }
<LT>
elementType = FlinkDataType()
<GT>
{
return new SqlArrayType(pos, elementType);
}
}
SqlArrayType.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SqlArrayType extends SqlIdentifier implements ExtendedSqlType {

private final SqlDataTypeSpec elementType;

public SqlArrayType(SqlParserPos pos, SqlDataTypeSpec elementType) {
super(SqlTypeName.ARRAY.getName(), pos);
this.elementType = elementType;
}

public SqlDataTypeSpec getElementType() {
return elementType;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("ARRAY<");
ExtendedSqlType.unparseType(this.elementType, writer, leftPrec, rightPrec);
writer.keyword(">");
}
}
SqlMultisetType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SqlIdentifier SqlMultisetType() :
{
SqlParserPos pos;
SqlDataTypeSpec elementType;
boolean nullable = true;
}
{
<MULTISET> { pos = getPos(); }
<LT>
elementType = FlinkDataType()
<GT>
{
return new SqlMultisetType(pos, elementType);
}
}
SqlMultisetType.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class SqlMultisetType extends SqlIdentifier implements ExtendedSqlType{
private final SqlDataTypeSpec elementType;

public SqlMultisetType(SqlParserPos pos, SqlDataTypeSpec elementType) {
super(SqlTypeName.MULTISET.getName(), pos);
this.elementType = elementType;
}

public SqlDataTypeSpec getElementType() {
return elementType;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("MULTISET<");
ExtendedSqlType.unparseType(this.elementType, writer, leftPrec, rightPrec);
writer.keyword(">");
}
}
SqlMapType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SqlIdentifier SqlMapType() :
{
SqlDataTypeSpec keyType;
SqlDataTypeSpec valType;
boolean nullable = true;
}
{
<MAP>
<LT>
keyType = FlinkDataType()
<COMMA>
valType = FlinkDataType()
<GT>
{
return new SqlMapType(getPos(), keyType, valType);
}
}
SqlMapType.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SqlMapType extends SqlIdentifier implements ExtendedSqlType{
private final SqlDataTypeSpec keyType;
private final SqlDataTypeSpec valType;

public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, SqlDataTypeSpec valType) {
super(SqlTypeName.MAP.getName(), pos);
this.keyType = keyType;
this.valType = valType;
}

public SqlDataTypeSpec getKeyType() {
return keyType;
}

public SqlDataTypeSpec getValType() {
return valType;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("MAP");
SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
writer.sep(",");
ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec);
writer.sep(",");
ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec);
writer.endList(frame);
}
}
SqlRowType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* Parse Row type, we support both Row(name1 type1, name2 type2) and Row<name1 type1, name2 type2>.
* Every item type can have suffix of `NULL` or `NOT NULL` to indicate if this type is nullable.
* i.e. Row(f0 int not null, f1 varchar null).
*/
SqlIdentifier SqlRowType() :
{
List<SqlIdentifier> fieldNames = new ArrayList<SqlIdentifier>();
List<SqlDataTypeSpec> fieldTypes = new ArrayList<SqlDataTypeSpec>();
List<SqlCharStringLiteral> comments = new ArrayList<SqlCharStringLiteral>();
}
{
<ROW>
(
<NE>
|
<LT> FieldNameTypeCommaList(fieldNames, fieldTypes, comments) <GT>
|
<LPAREN> FieldNameTypeCommaList(fieldNames, fieldTypes, comments) <RPAREN>
)
{
return new SqlRowType(getPos(), fieldNames, fieldTypes, comments);
}
}

/**
* Parse a "name1 type1 ['i'm a comment'], name2 type2 ..." list.
*/
void FieldNameTypeCommaList(
List<SqlIdentifier> fieldNames,
List<SqlDataTypeSpec> fieldTypes,
List<SqlCharStringLiteral> comments) :
{
SqlIdentifier fName;
SqlDataTypeSpec fType;
}
{
[
fName = SimpleIdentifier()
fType = FlinkDataType()
{
fieldNames.add(fName);
fieldTypes.add(fType);
}
(
<QUOTED_STRING> {
String p = SqlParserUtil.parseString(token.image);
comments.add(SqlLiteral.createCharString(p, getPos()));
}
|
{ comments.add(null); }
)
]
(
<COMMA>
fName = SimpleIdentifier()
fType = FlinkDataType()
{
fieldNames.add(fName);
fieldTypes.add(fType);
}
(
<QUOTED_STRING> {
String p = SqlParserUtil.parseString(token.image);
comments.add(SqlLiteral.createCharString(p, getPos()));
}
|
{ comments.add(null); }
)
)*
}
SqlRowType.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Data
public class SqlRowType extends SqlIdentifier implements ExtendedSqlType{
private final List<SqlIdentifier> fieldNames;
private final List<SqlDataTypeSpec> fieldTypes;
private final List<SqlCharStringLiteral> comments;

public SqlRowType(SqlParserPos pos,
List<SqlIdentifier> fieldNames,
List<SqlDataTypeSpec> fieldTypes,
List<SqlCharStringLiteral> comments) {
super(SqlTypeName.ROW.getName(), pos);
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
this.comments = comments;
}

public int getArity() {
return fieldNames.size();
}

public SqlIdentifier getFieldName(int i) {
return fieldNames.get(i);
}

public SqlDataTypeSpec getFieldType(int i) {
return fieldTypes.get(i);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.print("ROW");
if (getFieldNames().size() == 0) {
writer.print("<>");
} else {
SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
int i = 0;
for (Pair<SqlIdentifier, SqlDataTypeSpec> p : Pair.zip(this.fieldNames, this.fieldTypes)) {
writer.sep(",", false);
p.left.unparse(writer, 0, 0);
ExtendedSqlType.unparseType(p.right, writer, leftPrec, rightPrec);
if (comments.get(i) != null) {
comments.get(i).unparse(writer, leftPrec, rightPrec);
}
i += 1;
}
writer.endList(frame);
}
}
}
SqlTimeType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SqlIdentifier SqlTimeType() :
{
int precision = -1;
boolean withLocalTimeZone = false;
}
{
<TIME>
(
<LPAREN> precision = UnsignedIntLiteral() <RPAREN>
|
{ precision = -1; }
)
withLocalTimeZone = WithLocalTimeZone()
{ return new SqlTimeType(getPos(), precision, withLocalTimeZone); }
}
SqlTimeType.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class SqlTimeType extends SqlIdentifier implements ExtendedSqlType {
private final int precision;
private final boolean withLocalTimeZone;

public SqlTimeType(SqlParserPos pos, int precision, boolean withLocalTimeZone) {
super(getTypeName(withLocalTimeZone), pos);
this.precision = precision;
this.withLocalTimeZone = withLocalTimeZone;
}

private static String getTypeName(boolean withLocalTimeZone) {
if (withLocalTimeZone) {
return SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE.name();
} else {
return SqlTypeName.TIME.name();
}
}

public SqlTypeName getSqlTypeName() {
if (withLocalTimeZone) {
return SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE;
} else {
return SqlTypeName.TIME;
}
}

public int getPrecision() {
return this.precision;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword(SqlTypeName.TIME.name());
if (this.precision >= 0) {
final SqlWriter.Frame frame =
writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")");
writer.print(precision);
writer.endList(frame);
}
if (this.withLocalTimeZone) {
writer.keyword("WITH LOCAL TIME ZONE");
}
}
}
SqlTimestampType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SqlIdentifier SqlTimestampType() :
{
int precision = -1;
boolean withLocalTimeZone = false;
}
{
<TIMESTAMP>
(
<LPAREN> precision = UnsignedIntLiteral() <RPAREN>
|
{ precision = -1; }
)
withLocalTimeZone = WithLocalTimeZone()
{ return new SqlTimestampType(getPos(), precision, withLocalTimeZone); }
}
SqlTimestampType.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class SqlTimestampType extends SqlIdentifier implements ExtendedSqlType {
private final int precision;
private final boolean withLocalTimeZone;

public SqlTimestampType(SqlParserPos pos, int precision, boolean withLocalTimeZone) {
super(getTypeName(withLocalTimeZone), pos);
this.precision = precision;
this.withLocalTimeZone = withLocalTimeZone;
}

private static String getTypeName(boolean withLocalTimeZone) {
if (withLocalTimeZone) {
return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE.name();
} else {
return SqlTypeName.TIMESTAMP.name();
}
}

public SqlTypeName getSqlTypeName() {
if (withLocalTimeZone) {
return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
} else {
return SqlTypeName.TIMESTAMP;
}
}

public int getPrecision() {
return this.precision;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword(SqlTypeName.TIMESTAMP.name());
if (this.precision >= 0) {
final SqlWriter.Frame frame =
writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")");
writer.print(precision);
writer.endList(frame);
}
if (this.withLocalTimeZone) {
writer.keyword("WITH LOCAL TIME ZONE");
}
}
}

测试使用

SqlCreateKafkaTableTest#createKafkaSourceWithComputedColumnForWatermarkgit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

public class SqlCreateKafkaTableTest extends BaseParser {

@Test
public void createKafkaSourceWithComputedColumnForWatermark() {
String sql = "CREATE TABLE table_source(\n" +
" channel varchar, -- 频道\n" +
" pv int, -- 点击次数\n" +
" xctime varchar, -- yyyyMMddHHmmss格式时间戳,字符串类型\n" +
" ts AS TO_TIMESTAMP(xctime,'yyyyMMddHHmmss'), -- rowtime,必须为TIMESTAMP类型\n" +
" WATERMARK FOR ts AS withOffset( ts , '120000' ) --watermark计算方法,允许2分钟的乱序时间,即允许数据迟到2分钟\n" +
" )WITH(\n" +
" type='kafka11',\n" +
" kafka.bootstrap.servers='mwt:9092',\n" +
" kafka.auto.offset.reset='latest',\n" +
" kafka.kerberos.enabled='false',\n" +
" kafka.data.type='json',\n" +
" kafka.topic='table_source',\n" +
" parallelism='1'\n" +
" )\n";

// 创建解析器
SqlParser sqlParser = SqlParser.create(sql,
// 解析配置
SqlParser.configBuilder()
// 设置解析工厂
.setParserFactory(SqlParserImpl.FACTORY)
.setQuoting(Quoting.BACK_TICK)
.setUnquotedCasing(Casing.UNCHANGED)
.setQuotedCasing(Casing.UNCHANGED)
.setConformance(conformance)
.build());

final SqlNode sqlNode;
try {
// 解析sql语句
sqlNode = sqlParser.parseStmt();
} catch (SqlParseException e) {
throw new RuntimeException("Error while parsing SQL: " + sql, e);
}

// 将SqlNode直接转换成相应的解析结果类
assert sqlNode instanceof SqlCreateTable;
final SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode;

SqlIdentifier tableName = sqlCreateTable.getTableName();
LOG.debug("tableName: {}", tableName);

SqlNodeList columns = sqlCreateTable.getColumnList();
LOG.debug("columns: {}", columns);

// set with properties
SqlNodeList propertyList = sqlCreateTable.getPropertyList();
Map<String, String> properties = new HashMap<>();
if (propertyList != null) {
propertyList.getList().forEach(p ->
properties.put(((SqlTableOption) p).getKeyString().toLowerCase(),
((SqlTableOption) p).getValueString()));
}
LOG.debug("properties: {}", properties);

LOG.debug("eventTimeField:{}", sqlCreateTable.getEventTimeField());

LOG.debug("maxOutOrderless:{}", sqlCreateTable.getMaxOutOrderless());

String nodeInfo = sqlNode.toString();
LOG.debug("test allows secondary parsing: {}", nodeInfo);
}
}

CREATE VIEW 和 CREATE FUNCTION 操作类似,具体参考示例代码。

CREATE VIEW

  • SQL语法
1
2
3
4
5
6
CREATE VIEW table_sink_view AS
SELECT
a.*,b.info
FROM
table_source a
JOIN table_side b ON a.name=b.name;
  • javacc语法模板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Parses a create view or replace existing view statement.
* CREATE [OR REPLACE] VIEW view_name [ (field1, field2 ...) ] AS select_statement
*/
SqlCreate SqlCreateView(Span s, boolean replace) : {
SqlIdentifier viewName;
SqlCharStringLiteral comment = null;
SqlNode query;
SqlNodeList fieldList = SqlNodeList.EMPTY;
}
{
<VIEW>
viewName = CompoundIdentifier()
[
fieldList = ParenthesizedSimpleIdentifierList()
]
[ <COMMENT> <QUOTED_STRING> {
String p = SqlParserUtil.parseString(token.image);
comment = SqlLiteral.createCharString(p, getPos());
}
]
<AS>
query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
{
return new SqlCreateView(s.pos(), viewName, fieldList, query, replace, comment);
}
}

CREATE FUNCTION

  • SQL语法
1
CREATE FUNCTION StringLengthUdf AS 'com.dtwave.example.flink.udx.udf.StringLengthUdf';
  • javacc语法模板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 创建函数
*/
SqlCreateFunction SqlCreateFunction(Span s, boolean replace):
{
SqlParserPos pos;
SqlIdentifier functionName;
SqlNode className;
}
{
{ pos = getPos(); }
<FUNCTION> functionName = CompoundIdentifier()
<AS> className = StringLiteral()
{
return new SqlCreateFunction(s.pos(),functionName,className);
}
}

参考

javacc help