Flink SQL with Calcite

Apache Calcite 是一个动态数据的管理框架,可以用来构建数据库系统的语法解析模块,不包含数据存储、数据处理等功能。
Calcite只是对各种数据库(不同的数据源)的查询进行了封装,并对外提供了统一的查询入口。可以将Calcite理解成一个不包含存储层的数据库,它不需要关心任何文件格式。
Flink SQL 使用并对其扩展以支持 SQL 语句的解析与验证。
目前 Calcite 流处理语句已实现对 SELECT, WHERE, GROUP BY, HAVING, UNION ALL, ORDER BY 以及 FLOOR, CEIL 函数的支持。
本文将以代码的形式说明Calcite是如何解析DDL语句,javacc与java语言之间的转换关系。

Apache Calcite

mvn install -DskipTests -Dcheckstyle.skip=true

快速入门

编译Calcite源码

1
2
3
$ git clone https://github.com/apache/calcite.git
$ cd calcite
$ mvn install -DskipTests -Dcheckstyle.skip=true

csv

1
$ cd example/csv
model.json

使用sqlline来连接Calcite,这个工程里面包含了SQL shell脚本:

1
2
./sqlline
sqlline> !connect jdbc:calcite:model=target/test-classes/model.json admin admin

target/test-classes/model.json的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"version": "1.0",
"defaultSchema": "SALES",
"schemas": [
{
"name": "SALES",
"type": "custom",
"factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
"operand": {
"directory": "sales"
}
}
]
}

target/test-classes/sales目录下包含的文件将会被转换成3张表:

1
2
3
DEPTS.csv
EMPS.csv.gz
SDEPTS.csv

执行元数据查询:

1
2
3
4
5
6
7
8
9
10
0: jdbc:calcite:model=target/test-classes/mod> !tables
+-----------+-------------+------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_CAT | TYPE_SCHEM | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION |
+-----------+-------------+------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+
| | SALES | DEPTS | TABLE | | | | | | |
| | SALES | EMPS | TABLE | | | | | | |
| | SALES | SDEPTS | TABLE | | | | | | |
| | metadata | COLUMNS | SYSTEM TABLE | | | | | | |
| | metadata | TABLES | SYSTEM TABLE | | | | | | |
+-----------+-------------+------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+

查询emps表的数据:

1
2
3
4
5
6
7
8
9
10
11
0: jdbc:calcite:model=target/test-classes/mod> SELECT * FROM emps;
+-------+-------+--------+--------+---------------+-------+------+---------+---------+------------+
| EMPNO | NAME | DEPTNO | GENDER | CITY | EMPID | AGE | SLACKER | MANAGER | JOINEDAT |
+-------+-------+--------+--------+---------------+-------+------+---------+---------+------------+
| 100 | Fred | 10 | | | 30 | 25 | true | false | 1996-08-03 |
| 110 | Eric | 20 | M | San Francisco | 3 | 80 | | false | 2001-01-01 |
| 110 | John | 40 | M | Vancouver | 2 | null | false | true | 2002-05-03 |
| 120 | Wilma | 20 | F | | 1 | 5 | | true | 2005-09-07 |
| 130 | Alice | 40 | F | Vancouver | 2 | null | false | true | 2007-01-01 |
+-------+-------+--------+--------+---------------+-------+------+---------+---------+------------+
5 rows selected (1.377 seconds)

加入JOIN和GROUP BY操作:

1
2
3
4
5
6
7
8
9
10
11
0: jdbc:calcite:model=target/test-classes/mod> SELECT d.name, COUNT(*)
. . . . . . . . . . . . . . . . . . semicolon> FROM emps AS e
. . . . . . . . . . . . . . . . . . semicolon> JOIN depts AS d ON e.deptno = d.deptno
. . . . . . . . . . . . . . . . . . semicolon> GROUP BY d.name;
+-----------+--------+
| NAME | EXPR$1 |
+-----------+--------+
| Sales | 1 |
| Marketing | 2 |
+-----------+--------+
2 rows selected (0.353 seconds)

VALUES操作能够生成单独的一行,可以方便的用来测试表达式和内置的SQL函数:

1
2
3
4
5
6
7
0: jdbc:calcite:model=target/test-classes/mod> VALUES CHAR_LENGTH('Hello, ' || 'world!');
+--------+
| EXPR$0 |
+--------+
| 13 |
+--------+
1 row selected (0.09 seconds)
model-with-view.json
1
2
./sqlline
sqlline> !connect jdbc:calcite:model=target/test-classes/model-with-view.json admin admin

target/test-classes/model-with-view.json的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"version": "1.0",
"defaultSchema": "SALES",
"schemas": [
{
"name": "SALES",
"type": "custom",
"factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
"operand": {
"directory": "sales"
},
"tables": [
{
"name": "FEMALE_EMPS",
"type": "view",
"sql": "SELECT * FROM emps WHERE gender = 'F'"
}
]
}
]
}

执行元数据查询,多了一个FEMALE_EMPS视图:

1
2
3
4
5
6
7
8
9
10
11
0: jdbc:calcite:model=target/test-classes/mod> !tables
+-----------+-------------+-------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_CAT | TYPE_SCHEM | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION |
+-----------+-------------+-------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+
| | SALES | DEPTS | TABLE | | | | | | |
| | SALES | EMPS | TABLE | | | | | | |
| | SALES | SDEPTS | TABLE | | | | | | |
| | SALES | FEMALE_EMPS | VIEW | | | | | | |
| | metadata | COLUMNS | SYSTEM TABLE | | | | | | |
| | metadata | TABLES | SYSTEM TABLE | | | | | | |
+-----------+-------------+-------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+

查询FEMALE_EMPS视图,就像查询表一样查询它:

1
2
3
4
5
6
7
0: jdbc:calcite:model=target/test-classes/mod> SELECT e.name, d.name FROM female_emps AS e JOIN depts AS d on e.deptno = d.deptno;
+-------+-----------+
| NAME | NAME |
+-------+-----------+
| Wilma | Marketing |
+-------+-----------+
1 row selected (0.945 seconds)
model-with-custom-table.json

自定义表由用户自定义的代码来实现。

1
2
./sqlline
sqlline> !connect jdbc:calcite:model=target/test-classes/model-with-custom-table.json admin admin

target/test-classes/model-with-custom-table.json的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"version": "1.0",
"defaultSchema": "CUSTOM_TABLE",
"schemas": [
{
"name": "CUSTOM_TABLE",
"tables": [
{
"name": "EMPS",
"type": "custom",
"factory": "org.apache.calcite.adapter.csv.CsvTableFactory",
"operand": {
"file": "sales/EMPS.csv.gz",
"flavor": "scannable"
}
}
]
}
]
}

执行元数据查询:

1
2
3
4
5
6
7
8
1: jdbc:calcite:model=target/test-classes/mod> !tables
+-----------+--------------+------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_CAT | TYPE_SCHEM | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION |
+-----------+--------------+------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+
| | CUSTOM_TABLE | EMPS | TABLE | | | | | | |
| | metadata | COLUMNS | SYSTEM TABLE | | | | | | |
| | metadata | TABLES | SYSTEM TABLE | | | | | | |
+-----------+--------------+------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+

查询自定义表emps的数据:

1
2
3
4
5
6
7
8
9
10
11
1: jdbc:calcite:model=target/test-classes/mod> SELECT empno, name FROM custom_table.emps;
+-------+-------+
| EMPNO | NAME |
+-------+-------+
| 100 | Fred |
| 110 | Eric |
| 110 | John |
| 120 | Wilma |
| 130 | Alice |
+-------+-------+
5 rows selected (0.049 seconds)
执行计划优化查询

// TODO 未完待续

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sqlline> !connect jdbc:calcite:model=target/test-classes/model.json admin admin
0: jdbc:calcite:model=target/test-classes/mod> explain plan for select name from emps;
+------------------------------------------------------------------------------------------------------------------------+
| PLAN |
+------------------------------------------------------------------------------------------------------------------------+
| EnumerableCalc(expr#0..9=[{inputs}], NAME=[$t1])
EnumerableInterpreter
BindableTableScan(table=[[SALES, EMPS]])
|
+------------------------------------------------------------------------------------------------------------------------+
1 row selected (0.759 seconds)
0: jdbc:calcite:model=target/test-classes/mod> !connect jdbc:calcite:model=target/test-classes/smart.json admin admin
1: jdbc:calcite:model=target/test-classes/sma> explain plan for select name from emps;
+----------------------------------------------------+
| PLAN |
+----------------------------------------------------+
| CsvTableScan(table=[[SALES, EMPS]], fields=[[1]])
|
+----------------------------------------------------+
1 row selected (0.059 seconds)

target/test-classes/smart.json的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"version": "1.0",
"defaultSchema": "SALES",
"schemas": [
{
"name": "SALES",
"type": "custom",
"factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
"operand": {
"directory": "sales",
// 这一行会创建一个CsvSchema,并且它的createTable方法创建的是CsvTranslatableTable,而不是CsvScannableTable
"flavor": "TRANSLATABLE"
}
}
]
}

代数

// TODO 未完待续

适配器

// TODO 未完待续

Calcite扩展了SQL和关系代数,以支持流式查询。流是连续的永远的流记录的集合。与表不同,它们通常不存储在磁盘上,而是通过网络流动,在内存中保存较短时间。
流补充了表,流表示当前和未来的情况,表则表示过去。与表一样,通常希望使用基于关系代数的高级语言查询流,根据模式进行验证,并优化以利用可用的资源和算法。

流表JOIN

如果表的内容没有发生变化,那么流到表的连接很简单,这个查询用每个产品的单价丰富了订单流,也就是通常所说的生成宽表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT STREAM o.rowtime, o.productId, o.orderId, o.units,
p.name, p.unitPrice
FROM Orders AS o
JOIN Products AS p
ON o.productId = p.productId;

rowtime | productId | orderId | units | name | unitPrice
----------+-----------+---------+-------+ -------+-----------
10:17:00 | 30 | 5 | 4 | Cheese | 17
10:17:05 | 10 | 6 | 1 | Beer | 0.25
10:18:05 | 20 | 7 | 2 | Wine | 6
10:18:07 | 30 | 8 | 20 | Cheese | 17
11:02:00 | 10 | 9 | 6 | Beer | 0.25
11:04:00 | 10 | 10 | 1 | Beer | 0.25
11:09:30 | 40 | 11 | 12 | Bread | 100
11:24:11 | 10 | 12 | 4 | Beer | 0.25

但是如果表发生变化,例如,假设产品10的单价在11点增加到0.35。11点之前的订单应该是旧价格,11点之后的订单应该是新价格。
实现这一功能的一种方法是创建一个表,其中包含每个版本的开始和结束生效日期。即对维表进行版本控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT STREAM *
FROM Orders AS o
JOIN ProductVersions AS p
ON o.productId = p.productId
AND o.rowtime BETWEEN p.startDate AND p.endDate

rowtime | productId | orderId | units | productId1 | name | unitPrice
----------+-----------+---------+-------+ -----------+--------+-----------
10:17:00 | 30 | 5 | 4 | 30 | Cheese | 17
10:17:05 | 10 | 6 | 1 | 10 | Beer | 0.25
10:18:05 | 20 | 7 | 2 | 20 | Wine | 6
10:18:07 | 30 | 8 | 20 | 30 | Cheese | 17
11:02:00 | 10 | 9 | 6 | 10 | Beer | 0.35
11:04:00 | 10 | 10 | 1 | 10 | Beer | 0.35
11:09:30 | 40 | 11 | 12 | 40 | Bread | 100
11:24:11 | 10 | 12 | 4 | 10 | Beer | 0.35
流流JOIN

如果连接条件以某种方式强制两个流之间保持有限距离,那么连接两个流是有意义的。
在以下查询中,发货日期为订单日期后1小时内:

1
2
3
4
5
6
7
8
9
10
11
12
SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime
FROM Orders AS o
JOIN Shipments AS s
ON o.orderId = s.orderId
AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' HOUR;

rowtime | productId | orderId | shipTime
----------+-----------+---------+----------
10:17:00 | 30 | 5 | 10:55:00
10:17:05 | 10 | 6 | 10:20:00
11:02:00 | 10 | 9 | 11:58:00
11:24:11 | 10 | 12 | 11:44:00

相当多的订单没有出现,因为它们在一个小时之内没有发货。当系统接收到时间戳为11:24:11的订单10时,它已经从散列表中删除了包括时间戳为10:18:07的订单8在内的订单。

流式SQL

sql-window

流SQL中的窗口概念:

滚动窗口 Tumbling Window

将元素分配给每个固定长度的窗口,滚动窗口具有固定的尺寸,不重叠元素
avatar

滑动窗口 Sliding Window

滑动窗口将元素分配给固定长度的窗口,并且附加每次窗口的滑动频率,可以存在窗口重叠的情况
avatar

会话窗口 Session Window

按照会话元素进行分组,会话窗口不重叠,没有固定的开始时间和结束时间,当一定时间没有接收到新的元素的话,则会话窗口关闭
avatar

watermark

Watermark 和 Calcite 基本无关,和流式SQL有关。数据流中经常出现事件时间(Event Time)乱序的情况。

Flink Watermark设计:

  1. 周期Watermark(Periodic Watermark)

Periodic Watermark 按照固定时间间隔生成新的 watermark,不管是否有新的消息抵达。在两次 watermark 生成时间间隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的watermark。

1
2
3
4
5
6
7
8
9
10
11
12
13
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L; // 3.5 seconds
var currentMaxTimestamp: Long;
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp;
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}

其中extractTimestamp用于从消息中提取事件时间,而getCurrentWatermark用于生成新的水位线,新的水位线只有大于当前水位线才是有效的。每个窗口都会有该类的一个实例,因此可以利用实例的成员变量保存状态,比如上例中的当前最大时间戳。

  1. 标点Watermark(Punctuated Watermark)

标点水位线(Punctuated Watermark)通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。

1
2
3
4
5
6
7
8
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
if (element.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}
}

其中extractTimestamp用于从消息中提取事件时间,checkAndGetNextWatermark用于检查事件是否标点事件,若是则生成新的水位线。不同于定期水位线定时调用getCurrentWatermark,标点水位线是每接受一个事件就需要调用checkAndGetNextWatermark,若返回值非 null 且新水位线大于当前水位线,则触发窗口计算。

解析器Parser

fmpp

FMPP是以 freemarker 为模板的模板生成器

添加 Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<plugin>
<configuration>
<!--配置文件地址-->
<cfgFile>src/main/codegen/config.fmpp</cfgFile>
<!--文件输出目录-->
<outputDirectory>target/generated-sources/fmpp/</outputDirectory>
<!--文件模板存放目录-->
<templateDirectory>src/main/codegen/templates</templateDirectory>
</configuration>
<groupId>com.googlecode.fmpp-maven-plugin</groupId>
<artifactId>fmpp-maven-plugin</artifactId>
<version>1.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.28</version>
</dependency>
<dependency>
<groupId>net.sourceforge.fmpp</groupId>
<artifactId>fmpp</artifactId>
<version>0.9.16</version>
<exclusions>
<exclusion>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</plugin>
fmpp配置文件

config.fmpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 用data标示为变量
# 一般变量替换为 ${one} or ${two.three} ,具体语法请参考freemarker语法
# 也可以引用一个外部的tdd文件,
# include 指令插入另外的freemarker模板
data: {
one:1,
two: {
three: 3
}
implementationFiles: [
"parserImpls.ftl"
]
}

#
freemarkerLinks: {
includes: includes/
}
freemarker模板

模板1:

1
2
3
4
5
6
7
8
9
10
11
public class Main {
public static void main(String[] args){
System.out.println(${one} + ${two.three});
}
/**
* 额外附加代码
*/
<#list implementationFiles as file>
<#include "/@includes/"+file />
</#list>
}

模板2:

1
2
3
4
static {
System.out.println(${one});
System.out.println(${two.three});
}
执行maven插件
1
mvn fmpp:generate
生成文件

在target/generated-sources/fmpp目录下会生成如下代码文件:

1
2
3
4
5
6
7
8
9
10
11
12
public class Main {
public static void main(String[] args){
System.out.println(1 + 3);
}

/**
* 额外附加代码
*/
static {
System.out.println(1);
System.out.println(3);
}}

javacc

使用递归下降语法解析,LL(k)
其中,第一个L表示从左到右扫描输入;
第二个L表示每次都进行最左推导(在推导语法树的过程中每次都替换句型中最左的非终结符为终结符)
k表示每次向前探索(lookahead)k个终结符

语法描述文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
options {
javacc的选项
}

PARSER_BEGIN(解析器类名)
package 包名;
import 库名;

public class 解析器类名 {
任意的java代码
}
PARSER_END(解析器类名)

扫描器的描述

解析器的描述

示例Parser.tdd文件中可以定义如下:

1
2
3
4
5
6
7
8
9
10
11
package: "com.matty.flink.sql.parser.impl",
class: "SqlParserImpl",
imports: [
"com.matty.flink.sql.parser.ddl.SqlCreateTable",
"com.matty.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext",
"com.matty.flink.sql.parser.ddl.SqlTableColumn",
"com.matty.flink.sql.parser.ddl.SqlTableOption",
...
"java.util.List",
"java.util.ArrayList"
]
类介绍

会在 target/generated-sources 的 package 包下生成如下几个类:

  • XXX
    解析类入口,如上面指定的SqlParserImpl
  • SimpleCharStream
    SimpleCharStream jj_input_stream;
    词法分析器的输入流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 构造函数种类 ,可以接受Reader和InputStream
public class SimpleCharStream {

public SimpleCharStream(java.io.Reader dstream, int startline,
int startcolumn, int buffersize);

public SimpleCharStream(java.io.Reader dstream,
int startline, int startcolumn);

public SimpleCharStream(java.io.Reader dstream);

public SimpleCharStream(java.io.InputStream dstream, String encoding, int startline,
int startcolumn, int buffersize) throws java.io.UnsupportedEncodingException;

public SimpleCharStream(java.io.InputStream dstream, int startline,
int startcolumn, int buffersize);

public SimpleCharStream(java.io.InputStream dstream, String encoding, int startline,
int startcolumn) throws java.io.UnsupportedEncodingException;

public SimpleCharStream(java.io.InputStream dstream, int startline,
int startcolumn);

public SimpleCharStream(java.io.InputStream dstream, String encoding) throws java.io.UnsupportedEncodingException;

public SimpleCharStream(java.io.InputStream dstream);
}
  • XXXConstants
    Token常量,包括 SKIP TOKEN 和 TOKEN
    示例Parser.tdd文件中可以定义如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // 忽略的字符
    SKIP:{
    " "
    }

    // 关键字
    TOKEN:{
    <PLUS :"+">
    }

    // 新增的关键字列表。如果关键字不是一个保留关键字,将其添加到"无保留关键字"部分。
    keywords:[
    "WATERMARK",
    "offset"
    ]

    // "keywords"部分中未保留的关键字列表
    nonReservedKeywords:[
    "offset"
    ]

对应生成的java类为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 和常量申明对应
public interface XXXConstants {

int EOF = 0;
int PLUS = 2;
int DEFAULT = 0;
int WATERMARK = 669;
int offset = 678;

String[] tokenImage = {
// EOF 文件结尾
"<EOF>",
// 忽律字符串
"\" \"",
// PLUSA
"\"+\"",
"\"WATERMARK\"",
"\"offset\""
};
}
  • XXXTokenManager
    词法分析器
1
2
3
4
5
6
7
8
9
// 常见方法说明
public class XXXTokenManager implements XXXConstants{
// 输入流
protected SimpleCharStream input_stream;
// 构造函数
public XXXTokenManager(SimpleCharStream stream);
// 获取下一个Token
public Token getNextToken();
}
  • Token
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Token {

// Constants.java的种类
public int kind;

// 开始行和开始列,结束行和结束列
public int beginLine, beginColumn, endLine, endColumn;

// token的字符串
public String image;

// 下一个token
public Token next;

// 特殊令牌
public Token specialToken;

// Returns the image.
public String toString()
{
return image;
}

}
  • ParseException
    语法解析异常

  • TokenMgrError
    语法错误提示

语法介绍
  • java代码
    java代码块用{}声明

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 定义java代码块
    void javaCodeDemo():
    {}
    {
    {
    int i = 0;
    System.out.println(i);
    }
    }
  • java函数
    需要使用JAVACODE声明

    1
    2
    3
    JAVACODE void print(Token t){
    System.out.println(t);
    }
  • 条件

  1. if语句 []

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // if语句
    void ifExpr():
    {}
    {
    [
    <SELECT>
    {
    System.out.println("if select");
    }
    ]

    // 循环,出现一次
    (<SELECT>)?
    }
  2. if else 语句 |

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // if - else
    void ifElseExpr():
    {}
    {
    (
    <SELECT> {System.out.println("if else select");}
    |
    <UPDATE> {System.out.println("if else update");}
    |
    <DELETE> {System.out.println("if else delete");}
    |
    {
    System.out.println("other");
    }
    )
    }
  3. 循环

  • while 0~n

    1
    2
    3
    4
    5
    6
    // while 0~n
    void while1Expr():{
    }
    {
    (<SELECT>)*
    }
  • while 1~n

    1
    2
    3
    4
    5
    6
    // while 1~n
    void while2Expr():{
    }
    {
    (<SELECT>)+
    }
  1. 正则表达式
    []: 内容可选
    +: 内容出现1次或多次
    *: 内容出现0次或多次
    ?: 内容出现0次或1次
    |: 或
    (): 优先级改变或整体操作

Validator

sql优化

优化过程一般为以下过程:

  1. 对SQL进行词法分析得到一个语法树
  2. 根据关系代数进行SQL的逻辑优化
  3. 根据代价估算算法进行物理查询的优化
  4. 执行器执行
逻辑优化

sql执行

根据不同的Node定义了代码的实现方法,从最底层的RelNode依次执行,采用source接收数据,sink发送数据。

AggregateNode

FilterNode

ProjectNode

SortNode

WindowNode

Flink SQL 解析的代码参考:Flink SQL 解析

config.mpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#
# Calcite's parser grammar file (Parser.jj) is written in javacc
# (https://javacc.org/) with Freemarker (http://freemarker.org/) variables
# to allow clients to:
# 1. have custom parser implementation class and package name.
# 2. insert new parser method implementations written in javacc to parse
# custom:
# a) SQL statements.
# b) literals.
# c) data types.
# 3. add new keywords to support custom SQL constructs added as part of (2).
# 4. add import statements needed by inserted custom parser implementations.
#
# Parser template file (Parser.jj) along with this file are packaged as
# part of the calcite-core-<version>.jar under "codegen" directory.

# calcite模板配置
data: {
# calcite模板配置,引入parser模板文件
parser: tdd(../data/Parser.tdd)
}

# freemarker模板的位置
freemarkerLinks: {
includes: includes/
}

Parser.tdd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
{
# Generated parser implementation package and class name.
# 生成解析器实现类包和名称
# 包名
package: "com.matty.flink.sql.parser.impl",
# 实体类名
class: "SqlParserImpl",

# List of additional classes and packages to import.
# Example. "org.apache.calcite.sql.*", "java.util.List".
# 导入处理语句
imports: [
"com.matty.flink.sql.parser.ddl.SqlCreateTable",
"com.matty.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext",
"com.matty.flink.sql.parser.ddl.SqlTableColumn",
"com.matty.flink.sql.parser.ddl.SqlTableOption",
"com.matty.flink.sql.parser.ddl.SqlCreateView",
"com.matty.flink.sql.parser.ddl.SqlCreateFunction",
"com.matty.flink.sql.parser.ddl.FunctionType",
"com.matty.flink.sql.parser.dml.RichSqlInsert",
"com.matty.flink.sql.parser.dml.RichSqlInsertKeyword",
"com.matty.flink.sql.parser.type.SqlArrayType",
"com.matty.flink.sql.parser.type.SqlBytesType",
"com.matty.flink.sql.parser.type.SqlMapType",
"com.matty.flink.sql.parser.type.SqlMultisetType",
"com.matty.flink.sql.parser.type.SqlRowType",
"com.matty.flink.sql.parser.type.SqlStringType",
"com.matty.flink.sql.parser.type.SqlTimestampType",
"com.matty.flink.sql.parser.type.SqlTimeType",
"com.matty.flink.sql.parser.validate.FlinkSqlConformance",
"com.matty.flink.sql.parser.FlinkSqlDataTypeSpec",
"org.apache.calcite.sql.SqlDrop",
"org.apache.calcite.sql.SqlCreate",
"java.util.List",
"java.util.ArrayList"
]

# List of new keywords. Example: "DATABASES", "TABLES". If the keyword is not a reserved
# keyword, please also add it to 'nonReservedKeywords' section.
# 新增的关键字列表。如果关键字不是一个保留关键字,将其添加到"无保留关键字"部分。
keywords: [
"COMMENT",
"PARTITIONED",
"IF",
"WATERMARK",
"withOffset"
"ASCENDING",
"FROM_SOURCE",
"BOUNDED",
"DELAY",
"OVERWRITE",
"STRING",
"BYTES",
"offset",
"reset",
"SCALA"
]

# List of keywords from "keywords" section that are not reserved.
# "keywords"部分中未保留的关键字列表。
nonReservedKeywords: [
"A"
"ABSENT"
"ABSOLUTE"
"ACTION"
"ADA"
"ADD"
"ADMIN"
"AFTER"
"ALWAYS"
"APPLY"
"ASC"
"ASSERTION"
"ASSIGNMENT"
"ATTRIBUTE"
"ATTRIBUTES"
"BEFORE"
"BERNOULLI"
"BREADTH"
"C"
"CASCADE"
"CATALOG"
"CATALOG_NAME"
"CENTURY"
"CHAIN"
"CHARACTER_SET_CATALOG"
"CHARACTER_SET_NAME"
"CHARACTER_SET_SCHEMA"
"CHARACTERISTICS"
"CHARACTERS"
"CLASS_ORIGIN"
"COBOL"
"COLLATION"
"COLLATION_CATALOG"
"COLLATION_NAME"
"COLLATION_SCHEMA"
"COLUMN_NAME"
"COMMAND_FUNCTION"
"COMMAND_FUNCTION_CODE"
"COMMITTED"
"CONDITION_NUMBER"
"CONDITIONAL"
"CONNECTION"
"CONNECTION_NAME"
"CONSTRAINT_CATALOG"
"CONSTRAINT_NAME"
"CONSTRAINT_SCHEMA"
"CONSTRAINTS"
"CONSTRUCTOR"
"CONTINUE"
"CURSOR_NAME"
"DATA"
"DATABASE"
"DATETIME_INTERVAL_CODE"
"DATETIME_INTERVAL_PRECISION"
"DECADE"
"DEFAULTS"
"DEFERRABLE"
"DEFERRED"
"DEFINED"
"DEFINER"
"DEGREE"
"DEPTH"
"DERIVED"
"DESC"
"DESCRIPTION"
"DESCRIPTOR"
"DIAGNOSTICS"
"DISPATCH"
"DOMAIN"
"DOW"
"DOY"
"DYNAMIC_FUNCTION"
"DYNAMIC_FUNCTION_CODE"
"ENCODING"
"EPOCH"
"ERROR"
"EXCEPTION"
"EXCLUDE"
"EXCLUDING"
"FINAL"
"FIRST"
"FOLLOWING"
"FORMAT"
"FORTRAN"
"FOUND"
"FRAC_SECOND"
"G"
"GENERAL"
"GENERATED"
"GEOMETRY"
"GO"
"GOTO"
"GRANTED"
"HIERARCHY"
"IGNORE"
"IMMEDIATE"
"IMMEDIATELY"
"IMPLEMENTATION"
"INCLUDING"
"INCREMENT"
"INITIALLY"
"INPUT"
"INSTANCE"
"INSTANTIABLE"
"INVOKER"
"ISODOW"
"ISOYEAR"
"ISOLATION"
"JAVA"
"JSON"
"K"
"KEY"
"KEY_MEMBER"
"KEY_TYPE"
"LABEL"
"LAST"
"LENGTH"
"LEVEL"
"LIBRARY"
"LOCATOR"
"M"
"MATCHED"
"MAXVALUE"
"MICROSECOND"
"MESSAGE_LENGTH"
"MESSAGE_OCTET_LENGTH"
"MESSAGE_TEXT"
"MILLISECOND"
"MILLENNIUM"
"MINVALUE"
"MORE_"
"MUMPS"
"NAME"
"NAMES"
"NANOSECOND"
"NESTING"
"NORMALIZED"
"NULLABLE"
"NULLS"
"NUMBER"
"OBJECT"
"OCTETS"
"OPTION"
"OPTIONS"
"ORDERING"
"ORDINALITY"
"OTHERS"
"OUTPUT"
"OVERRIDING"
"PAD"
"PARAMETER_MODE"
"PARAMETER_NAME"
"PARAMETER_ORDINAL_POSITION"
"PARAMETER_SPECIFIC_CATALOG"
"PARAMETER_SPECIFIC_NAME"
"PARAMETER_SPECIFIC_SCHEMA"
"PARTIAL"
"PASCAL"
"PASSING"
"PASSTHROUGH"
"PAST"
"PATH"
"PLACING"
"PLAN"
"PLI"
"PRECEDING"
"PRESERVE"
"PRIOR"
"PRIVILEGES"
"PUBLIC"
"QUARTER"
"READ"
"RELATIVE"
"REPEATABLE"
"REPLACE"
"RESPECT"
"RESTART"
"RESTRICT"
"RETURNED_CARDINALITY"
"RETURNED_LENGTH"
"RETURNED_OCTET_LENGTH"
"RETURNED_SQLSTATE"
"RETURNING"
"ROLE"
"ROUTINE"
"ROUTINE_CATALOG"
"ROUTINE_NAME"
"ROUTINE_SCHEMA"
"ROW_COUNT"
"SCALAR"
"SCALE"
"SCHEMA"
"SCHEMA_NAME"
"SCOPE_CATALOGS"
"SCOPE_NAME"
"SCOPE_SCHEMA"
"SECTION"
"SECURITY"
"SELF"
"SEQUENCE"
"SERIALIZABLE"
"SERVER"
"SERVER_NAME"
"SESSION"
"SETS"
"SIMPLE"
"SIZE"
"SOURCE"
"SPACE"
"SPECIFIC_NAME"
"SQL_BIGINT"
"SQL_BINARY"
"SQL_BIT"
"SQL_BLOB"
"SQL_BOOLEAN"
"SQL_CHAR"
"SQL_CLOB"
"SQL_DATE"
"SQL_DECIMAL"
"SQL_DOUBLE"
"SQL_FLOAT"
"SQL_INTEGER"
"SQL_INTERVAL_DAY"
"SQL_INTERVAL_DAY_TO_HOUR"
"SQL_INTERVAL_DAY_TO_MINUTE"
"SQL_INTERVAL_DAY_TO_SECOND"
"SQL_INTERVAL_HOUR"
"SQL_INTERVAL_HOUR_TO_MINUTE"
"SQL_INTERVAL_HOUR_TO_SECOND"
"SQL_INTERVAL_MINUTE"
"SQL_INTERVAL_MINUTE_TO_SECOND"
"SQL_INTERVAL_MONTH"
"SQL_INTERVAL_SECOND"
"SQL_INTERVAL_YEAR"
"SQL_INTERVAL_YEAR_TO_MONTH"
"SQL_LONGVARBINARY"
"SQL_LONGVARNCHAR"
"SQL_LONGVARCHAR"
"SQL_NCHAR"
"SQL_NCLOB"
"SQL_NUMERIC"
"SQL_NVARCHAR"
"SQL_REAL"
"SQL_SMALLINT"
"SQL_TIME"
"SQL_TIMESTAMP"
"SQL_TINYINT"
"SQL_TSI_DAY"
"SQL_TSI_FRAC_SECOND"
"SQL_TSI_HOUR"
"SQL_TSI_MICROSECOND"
"SQL_TSI_MINUTE"
"SQL_TSI_MONTH"
"SQL_TSI_QUARTER"
"SQL_TSI_SECOND"
"SQL_TSI_WEEK"
"SQL_TSI_YEAR"
"SQL_VARBINARY"
"SQL_VARCHAR"
"STATE"
"STATEMENT"
"STRUCTURE"
"STYLE"
"SUBCLASS_ORIGIN"
"SUBSTITUTE"
"TABLE_NAME"
"TEMPORARY"
"TIES"
"TIMESTAMPADD"
"TIMESTAMPDIFF"
"TOP_LEVEL_COUNT"
"TRANSACTION"
"TRANSACTIONS_ACTIVE"
"TRANSACTIONS_COMMITTED"
"TRANSACTIONS_ROLLED_BACK"
"TRANSFORM"
"TRANSFORMS"
"TRIGGER_CATALOG"
"TRIGGER_NAME"
"TRIGGER_SCHEMA"
"TYPE"
"UNBOUNDED"
"UNCOMMITTED"
"UNCONDITIONAL"
"UNDER"
"UNNAMED"
"USAGE"
"USER_DEFINED_TYPE_CATALOG"
"USER_DEFINED_TYPE_CODE"
"USER_DEFINED_TYPE_NAME"
"USER_DEFINED_TYPE_SCHEMA"
"UTF8"
"UTF16"
"UTF32"
"VERSION"
"VIEW"
"WEEK"
"WRAPPER"
"WORK"
"WRITE"
"XML"
"ZONE",

# not in core, added in Flink
"PARTITIONED",
"IF",
"ASCENDING",
"FROM_SOURCE",
"BOUNDED",
"DELAY",
"OVERWRITE",
"offset",
"reset"
]

# List of methods for parsing custom SQL statements.
# Return type of method implementation should be 'SqlNode'.
# Example: SqlShowDatabases(), SqlShowTables().
# 用于解析自定义SQL语句的方法列表
statementParserMethods: [
"RichSqlInsert()"
]

# List of methods for parsing custom literals.
# Return type of method implementation should be "SqlNode".
# Example: ParseJsonLiteral().
# 解析自定义文本的方法列表
literalParserMethods: [
]

# List of methods for parsing ddl supported data types.
# Return type of method implementation should be "SqlIdentifier".
# Example: SqlParseTimeStampZ().
flinkDataTypeParserMethods: [
"SqlArrayType()",
"SqlMultisetType()",
"SqlMapType()",
"SqlRowType()",
"SqlStringType()",
"SqlBytesType()",
"SqlTimestampType()",
"SqlTimeType()"
]

# List of methods for parsing custom data types.
# Return type of method implementation should be "SqlIdentifier".
# Example: SqlParseTimeStampZ().
# 解析自定义数据类型的方法列表。
dataTypeParserMethods: [
]

# List of methods for parsing builtin function calls.
# Return type of method implementation should be "SqlNode".
# Example: DateFunctionCall().
builtinFunctionCallMethods: [
]

# List of methods for parsing extensions to "ALTER <scope>" calls.
# Each must accept arguments "(SqlParserPos pos, String scope)".
# Example: "SqlUploadJarNode"
# 解析扩展到"Alter"调用的方法。
# 每个都必须接受参数 "(SqlParserPos pos, String scope)"
alterStatementParserMethods: [
]

# List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
# Each must accept arguments "(SqlParserPos pos, boolean replace)".
# 解析扩展以"CREATE [OR REPLACE]"调用的方法列表。
# 每个都必须接受参数 "(SqlParserPos pos, boolean replace)"
createStatementParserMethods: [
"SqlCreateTable",
"SqlCreateView",
"SqlCreateFunction"
]

# List of methods for parsing extensions to "DROP" calls.
# Each must accept arguments "(Span s)".
# 解析扩展到"DROP"调用的方法列表。
# 每个都必须接受参数 "(SqlParserPos pos)"
dropStatementParserMethods: [
]

# List of files in @includes directory that have parser method
# implementations for parsing custom SQL statements, literals or types
# given as part of "statementParserMethods", "literalParserMethods" or
# "dataTypeParserMethods".
# @includes目录中具有解析器方法的文件列表
# 解析自定义SQL语句、文本或类型的实现
# 作为“statementParserMethods”、“literalParserMethods”或“dataTypeParserMethods”的一部分给出。
implementationFiles: [
"parserImpls.ftl"
]

# List of additional join types. Each is a method with no arguments.
# Example: LeftSemiJoin()
# 其他连接类型的列表。每个方法都是没有参数的方法。
joinTypes: [
]

includeCompoundIdentifier: true
includeBraces: true
includeAdditionalDeclarations: false
}

以下结合 Parser.tdd 文件和 calcite-core jar包中打进去的 Parser.jj文件结合说明:

packge,class,imports

主要是负责导入包,设置编译package目录,编译的类名

1
2
3
4
5
6
7
PARSER_BEGIN(${parser.class})

package ${parser.package};

<#list parser.imports as importStr>
import ${importStr};
</#list>

keywords

定义关键字

1
2
3
4
5
6
7
8
<DEFAULT, DQID, BTID> TOKEN :
{
...
<#-- additional parser keywords are included here -->
<#list parser.keywords as keyword>
| < ${keyword}: "${keyword}" >
</#list>
}

nonReservedKeywords

keywords定义关键字中,非保留的关键字

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
String NonReservedKeyWord() :
{
}
{
(
NonReservedKeyWord0of3()
| NonReservedKeyWord1of3()
| NonReservedKeyWord2of3()
)
{
return unquotedIdentifier();
}
}

/** @see #NonReservedKeyWord */
void NonReservedKeyWord0of3() :
{
}
{
(
<#list parser.nonReservedKeywords as keyword>
<#if keyword?index == 0>
<${keyword}>
<#elseif keyword?index % 3 == 0>
| <${keyword}>
</#if>
</#list>
)
}

/** @see #NonReservedKeyWord */
void NonReservedKeyWord1of3() :
{
}
{
(
<#list parser.nonReservedKeywords as keyword>
<#if keyword?index == 1>
<${keyword}>
<#elseif keyword?index % 3 == 1>
| <${keyword}>
</#if>
</#list>
)
}

/** @see #NonReservedKeyWord */
void NonReservedKeyWord2of3() :
{
}
{
(
<#list parser.nonReservedKeywords as keyword>
<#if keyword?index == 2>
<${keyword}>
<#elseif keyword?index % 3 == 2>
| <${keyword}>
</#if>
</#list>
)
}

joinTypes

join类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
SqlLiteral JoinType() :
{
JoinType joinType;
}
{
(
<JOIN> { joinType = JoinType.INNER; }
|
<INNER> <JOIN> { joinType = JoinType.INNER; }
|
<LEFT> [ <OUTER> ] <JOIN> { joinType = JoinType.LEFT; }
|
<RIGHT> [ <OUTER> ] <JOIN> { joinType = JoinType.RIGHT; }
|
<FULL> [ <OUTER> ] <JOIN> { joinType = JoinType.FULL; }
|
<CROSS> <JOIN> { joinType = JoinType.CROSS; }
<#list parser.joinTypes as method>
|
joinType = ${method}()
</#list>
)
{
return joinType.symbol(getPos());
}
}

statementParserMethods

解析自定义SQL语句的方法列表,必须继承SqlNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Parses an SQL statement.
*/
SqlNode SqlStmt() :
{
SqlNode stmt;
}
{
(
<#-- Add methods to parse additional statements here -->
<#list parser.statementParserMethods as method>
LOOKAHEAD(2) stmt = ${method}
|
</#list>
stmt = SqlSetOption(Span.of(), null)
|
stmt = SqlAlter()
|
<#if parser.createStatementParserMethods?size != 0>
stmt = SqlCreate()
|
</#if>
<#if parser.dropStatementParserMethods?size != 0>
stmt = SqlDrop()
|
</#if>
stmt = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
|
stmt = SqlExplain()
|
stmt = SqlDescribe()
|
stmt = SqlInsert()
|
stmt = SqlDelete()
|
stmt = SqlUpdate()
|
stmt = SqlMerge()
|
stmt = SqlProcedureCall()
)
{
return stmt;
}
}

literalParserMethods

解析自定义文字的方法列表,必须实现SqlNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
SqlNode Literal() :
{
SqlNode e;
}
{
(
e = NumericLiteral()
|
e = StringLiteral()
|
e = SpecialLiteral()
|
e = DateTimeLiteral()
|
e = IntervalLiteral()
<#-- additional literal parser methods are included here -->
<#list parser.literalParserMethods as method>
|
e = ${method}
</#list>
)
{
return e;
}


}

dataTypeParserMethods

解析自定义数据类型的方法列表,必须实现SqlIdentifier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Some SQL type names need special handling due to the fact that they have
// spaces in them but are not quoted.
SqlIdentifier TypeName() :
{
final SqlTypeName sqlTypeName;
final SqlIdentifier typeName;
final Span s = Span.of();
}
{
(
LOOKAHEAD(2)
sqlTypeName = SqlTypeName(s) {
typeName = new SqlIdentifier(sqlTypeName.name(), s.end(this));
}
<#-- additional types are included here -->
<#list parser.dataTypeParserMethods as method>
|
typeName = ${method}
</#list>
|
typeName = CollectionsTypeName()
|
typeName = CompoundIdentifier()
)
{
return typeName;
}
}

alterStatementParserMethods

解析自定义alter语句,必须有构造方法 (SqlParserPos pos, String scope)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Parses an expression for setting or resetting an option in SQL, such as QUOTED_IDENTIFIERS,
* or explain plan level (physical/logical).
*/
SqlAlter SqlAlter() :
{
final Span s;
final String scope;
final SqlAlter alterNode;
}
{
<ALTER> { s = span(); }
scope = Scope()
(
<#-- additional literal parser methods are included here -->
<#list parser.alterStatementParserMethods as method>
alterNode = ${method}(s, scope)
|
</#list>

alterNode = SqlSetOption(s, scope)
)
{
return alterNode;
}
}

createStatementParserMethods

解析自定义create语句,必须有构造方法 (SqlParserPos pos, String scope)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<#if parser.createStatementParserMethods?size != 0>
/**
* Parses a CREATE statement.
*/
SqlCreate SqlCreate() :
{
final Span s;
boolean replace = false;
final SqlCreate create;
}
{
<CREATE> { s = span(); }
[
<OR> <REPLACE> {
replace = true;
}
]
(
<#-- additional literal parser methods are included here -->
<#list parser.createStatementParserMethods as method>
create = ${method}(s, replace)
<#sep>|</#sep>
</#list>
)
{
return create;
}
}
</#if>

dropStatementParserMethods

解析自定义drop语句,必须有构造方法(SqlParserPos pos)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<#if parser.dropStatementParserMethods?size != 0>
/**
* Parses a DROP statement.
*/
SqlDrop SqlDrop() :
{
final Span s;
boolean replace = false;
final SqlDrop drop;
}
{
<DROP> { s = span(); }
(
<#-- additional literal parser methods are included here -->
<#list parser.dropStatementParserMethods as method>
drop = ${method}(s, replace)
<#sep>|</#sep>
</#list>
)
{
return drop;
}
}
</#if>

implementationFiles

自定义模板文件

1
2
3
4
<#-- Add implementations of additional parser statement calls here -->
<#list parser.implementationFiles as file>
<#include "/@includes/"+file />
</#list>

includeCompoundIdentifier

是否包含CompoundIdentifier解析

Parser.jj常用方法

getPos

获取当前token的位置,自定义解析时使用

StringLiteral

解析语句中的string类型的字段,带’’

1
select * from table where a='string';

Identifier

解析Identifier字段,返回string类型

1
select Identifier from table;

可以有SimpleIdentifier,可以是表名、字段名、别名等
或CompoundIdentifier,如要解析kafka.bootstrap.servers这样的复合字符串

以下介绍真正运用在Flink SQL解析的示例:

DDL

CREATE TABLE

自定义SQL语法

自定义SQL DDL语句,创建source表、维表、sink表。下面给出几种常见的建表sql示例:

  • 创建kafka source表,带计算列和watermark的定义

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    CREATE TABLE table_source(
    channel varchar, -- 频道
    pv int, -- 点击次数
    xctime varchar, -- yyyyMMddHHmmss格式时间戳,字符串类型
    ts AS TO_TIMESTAMP(xctime,'yyyyMMddHHmmss'), -- rowtime,必须为Timestamp类型或Long类型
    WATERMARK FOR ts AS withOffset( ts , '120000' ) --watermark计算方法,允许2分钟的乱序时间,即允许数据迟到2分钟
    )WITH(
    type='kafka11',
    kafka.bootstrap.servers='mwt:9092',
    kafka.auto.offset.reset='latest',
    kafka.kerberos.enabled='false',
    kafka.data.type='json',
    kafka.topic='table_source',
    parallelism='1'
    )
  • 创建mysql维表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    CREATE TABLE table_side(
    name varchar comment '名称', -- 允许给字段设置comment
    info varchar comment '详细信息',
    PRIMARY KEY(name), -- 创建维表一定要指定 PRIMARY KEY
    PERIOD FOR SYSTEM_TIME -- 维表标识
    )WITH(
    type ='mysql',
    url ='jdbc:mysql://192.168.1.8:3306/demo?charset=utf8',
    userName ='dipper',
    password ='ide@123',
    tableName ='table_side',
    cache ='NONE',
    parallelism ='1'
    )
  • 创建sink表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    CREATE TABLE table_sink(
    name varchar,
    channel varchar,
    pv int,
    xctime bigint,
    info varchar
    )WITH(
    type ='console',
    parallelism='1'
    );
定义解析结果类
  • SqlCreateTable.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    package com.matty.flink.sql.parser.ddl;

    import com.matty.flink.sql.parser.ExtendedSqlNode;
    import com.matty.flink.sql.parser.exception.SqlParseException;
    import lombok.Data;
    import org.apache.calcite.sql.*;
    import org.apache.calcite.sql.dialect.AnsiSqlDialect;
    import org.apache.calcite.sql.parser.SqlParserPos;
    import org.apache.calcite.sql.pretty.SqlPrettyWriter;
    import org.apache.calcite.util.ImmutableNullableList;
    import org.apache.calcite.util.NlsString;

    import java.util.ArrayList;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Set;

    import static java.util.Objects.requireNonNull;

    /**
    * Description:
    *
    * @author mwt
    * @version 1.0
    * @date 2019-10-08
    */
    @Data
    public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {

    public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE);

    private final SqlIdentifier tableName;

    private final SqlNodeList columnList;

    private final SqlNodeList propertyList;

    private final SqlNodeList primaryKeyList;

    private final SqlCharStringLiteral comment;

    private final boolean sideFlag;

    private SqlIdentifier eventTimeField;

    private SqlNode maxOutOrderless;

    public SqlCreateTable(
    SqlParserPos pos,
    SqlIdentifier tableName,
    SqlNodeList columnList,
    SqlNodeList primaryKeyList,
    SqlNodeList propertyList,
    SqlCharStringLiteral comment,
    boolean sideFlag,
    SqlIdentifier eventTimeField,
    SqlNode maxOutOrderless) {
    super(OPERATOR, pos, false, false);
    this.tableName = requireNonNull(tableName, "Table name is missing");
    this.columnList = requireNonNull(columnList, "Column list should not be null");
    this.primaryKeyList = primaryKeyList;
    this.propertyList = propertyList;
    this.comment = comment;
    this.sideFlag = sideFlag;
    this.eventTimeField = eventTimeField;
    this.maxOutOrderless = maxOutOrderless;
    }

    @Override
    public SqlOperator getOperator() {
    return OPERATOR;
    }

    @Override
    public List<SqlNode> getOperandList() {
    return ImmutableNullableList.of(tableName, columnList, primaryKeyList,
    propertyList, comment);
    }

    public Long getMaxOutOrderless() {
    return Long.parseLong(((NlsString) SqlLiteral.value(maxOutOrderless)).getValue());
    }


    @Override
    public void validate() throws SqlParseException {
    Set<String> columnNames = new HashSet<>();
    if (columnList != null) {
    for (SqlNode column : columnList) {
    String columnName = null;
    if (column instanceof SqlTableColumn) {
    SqlTableColumn tableColumn = (SqlTableColumn) column;
    columnName = tableColumn.getName().getSimple();
    if (tableColumn.getAlias() != null) {
    columnName = tableColumn.getAlias().getSimple();
    }
    String typeName = tableColumn.getType().getTypeName().getSimple();
    if (SqlColumnType.getType(typeName).isUnsupported()) {
    throw new SqlParseException(
    column.getParserPosition(),
    "Not support type [" + typeName + "], at " + column.getParserPosition());
    }
    } else if (column instanceof SqlBasicCall) {
    SqlBasicCall tableColumn = (SqlBasicCall) column;
    columnName = tableColumn.getOperands()[1].toString();
    }

    if (!columnNames.add(columnName)) {
    throw new SqlParseException(
    column.getParserPosition(),
    "Duplicate column name [" + columnName + "], at " +
    column.getParserPosition());
    }
    }
    }

    if (this.primaryKeyList != null) {
    for (SqlNode primaryKeyNode : this.primaryKeyList) {
    String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple();
    if (!columnNames.contains(primaryKey)) {
    throw new SqlParseException(
    primaryKeyNode.getParserPosition(),
    "Primary key [" + primaryKey + "] not defined in columns, at " +
    primaryKeyNode.getParserPosition());
    }
    }
    }

    }

    public boolean containsComputedColumn() {
    for (SqlNode column : columnList) {
    if (column instanceof SqlBasicCall) {
    return true;
    }
    }
    return false;
    }

    /**
    * Returns the projection format of the DDL columns(including computed columns).
    * e.g. If we got a DDL:
    * <pre>
    * create table tbl1(
    * col1 int,
    * col2 varchar,
    * col3 as to_timestamp(col2)
    * ) with (
    * 'connector' = 'csv'
    * )
    * </pre>
    * we would return a query like:
    *
    * <p>"col1, col2, to_timestamp(col2) as col3", caution that the "computed column" operands
    * have been reversed.
    */
    public String getColumnSqlString() {
    SqlPrettyWriter writer = new SqlPrettyWriter(AnsiSqlDialect.DEFAULT);
    writer.setAlwaysUseParentheses(true);
    writer.setSelectListItemsOnSeparateLines(false);
    writer.setIndentation(0);
    writer.startList("", "");
    for (SqlNode column : columnList) {
    writer.sep(",");
    if (column instanceof SqlTableColumn) {
    SqlTableColumn tableColumn = (SqlTableColumn) column;
    tableColumn.getName().unparse(writer, 0, 0);
    } else {
    column.unparse(writer, 0, 0);
    }
    }

    return writer.toString();
    }

    @Override
    public void unparse(
    SqlWriter writer,
    int leftPrec,
    int rightPrec) {
    writer.keyword("CREATE TABLE");
    tableName.unparse(writer, leftPrec, rightPrec);
    SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
    for (SqlNode column : columnList) {
    printIndent(writer);
    if (column instanceof SqlBasicCall) {
    SqlCall call = (SqlCall) column;
    SqlCall newCall = call.getOperator().createCall(
    SqlParserPos.ZERO,
    call.operand(1),
    call.operand(0));
    newCall.unparse(writer, leftPrec, rightPrec);
    } else {
    column.unparse(writer, leftPrec, rightPrec);
    }
    }

    if (primaryKeyList != null && primaryKeyList.size() > 0) {
    printIndent(writer);
    writer.keyword("PRIMARY KEY");
    SqlWriter.Frame keyFrame = writer.startList("(", ")");
    primaryKeyList.unparse(writer, leftPrec, rightPrec);
    writer.endList(keyFrame);
    }

    if (sideFlag) {
    printIndent(writer);
    writer.keyword("PERIOD FOR SYSTEM_TIME");
    }

    if (eventTimeField != null) {
    printIndent(writer);
    writer.keyword("WATERMARK FOR ");
    eventTimeField.unparse(writer, leftPrec, rightPrec);
    writer.keyword("AS withOffset");
    SqlWriter.Frame offsetFrame = writer.startList("(", ")");
    eventTimeField.unparse(writer, leftPrec, rightPrec);
    writer.keyword(",");
    maxOutOrderless.unparse(writer, leftPrec, rightPrec);
    writer.endList(offsetFrame);
    }

    writer.newlineAndIndent();
    writer.endList(frame);

    if (comment != null) {
    writer.newlineAndIndent();
    writer.keyword("COMMENT");
    comment.unparse(writer, leftPrec, rightPrec);
    }

    if (propertyList != null) {
    writer.keyword("WITH");
    SqlWriter.Frame withFrame = writer.startList("(", ")");
    for (SqlNode property : propertyList) {
    printIndent(writer);
    property.unparse(writer, leftPrec, rightPrec);
    }
    writer.newlineAndIndent();
    writer.endList(withFrame);
    }
    }

    private void printIndent(SqlWriter writer) {
    writer.sep(",", false);
    writer.newlineAndIndent();
    writer.print(" ");
    }

    /**
    * Table creation context.
    */
    public static class TableCreationContext {
    public List<SqlNode> columnList = new ArrayList<>();
    public SqlNodeList primaryKeyList;
    public boolean sideFlag;
    public SqlIdentifier eventTimeField;
    public SqlNode maxOutOrderless;
    }

    public String[] fullTableName() {
    return tableName.names.toArray(new String[0]);
    }
    }
  • SqlTableColumn.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    @Data
    public class SqlTableColumn extends SqlCall {
    private static final SqlSpecialOperator OPERATOR =
    new SqlSpecialOperator("COLUMN_DECL", SqlKind.COLUMN_DECL);

    private SqlIdentifier name;
    private SqlDataTypeSpec type;
    private SqlIdentifier alias;
    private SqlCharStringLiteral comment;

    public SqlTableColumn(SqlIdentifier name,
    SqlDataTypeSpec type,
    SqlIdentifier alias,
    SqlCharStringLiteral comment,
    SqlParserPos pos) {
    super(pos);
    this.name = requireNonNull(name, "Column name should not be null");
    this.type = requireNonNull(type, "Column type should not be null");
    this.alias = alias;
    this.comment = comment;
    }

    @Override
    public SqlOperator getOperator() {
    return OPERATOR;
    }

    @Override
    public List<SqlNode> getOperandList() {
    return ImmutableNullableList.of(name, type, comment);
    }

    @Override
    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
    this.name.unparse(writer, leftPrec, rightPrec);
    writer.print(" ");
    ExtendedSqlType.unparseType(type, writer, leftPrec, rightPrec);
    if (this.alias != null) {
    writer.print(" AS ");
    this.alias.unparse(writer, leftPrec, rightPrec);
    }
    if (this.comment != null) {
    writer.print(" COMMENT ");
    this.comment.unparse(writer, leftPrec, rightPrec);
    }
    }

    }
javacc语法模板
  • SqlCreateTable的主体语法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    /**
    * Parse a table creation.
    */
    SqlCreate SqlCreateTable(Span s, boolean replace) :
    {
    final SqlParserPos startPos = s.pos();
    SqlIdentifier tableName;
    SqlNodeList primaryKeyList = null;
    SqlNodeList columnList = SqlNodeList.EMPTY;
    SqlCharStringLiteral comment = null;
    boolean sideFlag = false;
    SqlIdentifier eventTimeField = null;
    SqlNode maxOutOrderless = null;

    SqlNodeList propertyList = SqlNodeList.EMPTY;
    SqlParserPos pos = startPos;
    }
    {
    <TABLE>

    tableName = CompoundIdentifier()
    [
    <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
    TableColumn(ctx)
    (
    <COMMA> TableColumn(ctx)
    )*
    {
    pos = pos.plus(getPos());
    columnList = new SqlNodeList(ctx.columnList, pos);
    primaryKeyList = ctx.primaryKeyList;
    sideFlag = ctx.sideFlag;
    eventTimeField = ctx.eventTimeField;
    maxOutOrderless = ctx.maxOutOrderless;
    }
    <RPAREN>
    ]

    [ <COMMENT> <QUOTED_STRING> {
    String p = SqlParserUtil.parseString(token.image);
    comment = SqlLiteral.createCharString(p, getPos());
    }]
    [
    <WITH>
    propertyList = TableProperties()
    ]
    {
    return new SqlCreateTable(startPos.plus(getPos()),
    tableName,
    columnList,
    primaryKeyList,
    propertyList,
    comment,
    sideFlag,
    eventTimeField,
    maxOutOrderless);
    }
    }
  • TableColumn
    将 TableCreationContext 作为参数传递给 TableColumn,解析得到的结果都会设置到 TableCreationContext 类中。分别会解析字段列、主键信息、计算列、watermark 标识、维表标识。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    /**
    * 获取表字段列
    */
    void TableColumn(TableCreationContext context) :
    {
    }
    {
    (LOOKAHEAD(3)
    TableColumn2(context.columnList)
    |
    context.primaryKeyList = PrimaryKey()
    |
    Watermark(context)
    |
    ComputedColumn(context)
    |
    context.sideFlag = SideFlag()
    )
    }
  • TableColumn2
    解析建表语句的字段值,类型为自定义的 FlinkSqlDataTypeSpec

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    void TableColumn2(List<SqlNode> list) :
    {
    SqlParserPos pos;
    SqlIdentifier name;
    SqlDataTypeSpec type;
    SqlIdentifier alias = null;
    SqlCharStringLiteral comment = null;
    }
    {
    name = SimpleIdentifier()
    <#-- #FlinkDataType already takes care of the nullable attribute. -->
    type = FlinkDataType()
    [ <AS> alias = SimpleIdentifier()
    ]
    [ <COMMENT> <QUOTED_STRING> {
    String p = SqlParserUtil.parseString(token.image);
    comment = SqlLiteral.createCharString(p, getPos());
    }]
    {
    SqlTableColumn tableColumn = new SqlTableColumn(name, type , alias ,comment , getPos());
    list.add(tableColumn);
    }
    }
  • FlinkDataType
    Flink Sql有许多自定义类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    /**
    * Parse a Flink data type with nullable options, NULL -> nullable, NOT NULL -> not nullable.
    * Default to be nullable.
    */
    SqlDataTypeSpec FlinkDataType() :
    {
    final SqlIdentifier typeName;
    SqlIdentifier collectionTypeName = null;
    int scale = -1;
    int precision = -1;
    String charSetName = null;
    final Span s;
    boolean nullable = true;
    boolean elementNullable = true;
    }
    {
    typeName = FlinkTypeName() {
    s = span();
    }
    [
    <LPAREN>
    precision = UnsignedIntLiteral()
    [
    <COMMA>
    scale = UnsignedIntLiteral()
    ]
    <RPAREN>
    ]
    elementNullable = NullableOpt()
    [
    collectionTypeName = FlinkCollectionsTypeName()
    nullable = NullableOpt()
    ]
    {
    if (null != collectionTypeName) {
    return new FlinkSqlDataTypeSpec(
    collectionTypeName,
    typeName,
    precision,
    scale,
    charSetName,
    nullable,
    elementNullable,
    s.end(collectionTypeName));
    }
    nullable = elementNullable;
    return new FlinkSqlDataTypeSpec(typeName,
    precision,
    scale,
    charSetName,
    null,
    nullable,
    elementNullable,
    s.end(this));
    }
    }
  • PrimaryKey
    解析主键信息,主键字段可能有多个

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    /**
    * 主键信息
    */
    SqlNodeList PrimaryKey() :
    {
    List<SqlNode> pkList = new ArrayList<SqlNode>();

    SqlParserPos pos;
    SqlIdentifier columnName;
    }
    {
    <PRIMARY> { pos = getPos(); } <KEY> <LPAREN>
    columnName = SimpleIdentifier() { pkList.add(columnName); }
    (<COMMA> columnName = SimpleIdentifier() { pkList.add(columnName); })*
    <RPAREN>
    {
    return new SqlNodeList(pkList, pos.plus(getPos()));
    }
    }
  • Watermark
    解析watermark,事件时间窗口需要使用 watermark

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    void Watermark(TableCreationContext context) :
    {
    SqlNode identifier;
    SqlNode expr;
    boolean hidden = false;
    SqlParserPos pos;
    SqlIdentifier eventTimeField;
    SqlNode maxOutOrderless;
    }
    {
    <WATERMARK> <FOR> eventTimeField = SimpleIdentifier() <AS> <withOffset>
    <LPAREN>
    eventTimeField = SimpleIdentifier() { context.eventTimeField = eventTimeField; }
    <COMMA> maxOutOrderless = StringLiteral() {context.maxOutOrderless = maxOutOrderless; }
    <RPAREN>
    }
  • ComputedColumn
    解析计算列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    void ComputedColumn(TableCreationContext context) :
    {
    SqlNode identifier = null;
    SqlNode expr;
    boolean hidden = false;
    SqlParserPos pos;
    }
    {
    identifier = SimpleIdentifier() {pos = getPos();}
    <AS>
    expr = Expression(ExprContext.ACCEPT_SUB_QUERY) {
    expr = SqlStdOperatorTable.AS.createCall(Span.of(identifier, expr).pos(), expr, identifier);
    context.columnList.add(expr);
    }
    }
  • SideFlag
    解析维表标识,如果为true,说明创建的是维表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * 维表标识
    */
    boolean SideFlag() :
    {
    SqlParserPos pos;
    SqlIdentifier columnName;
    }
    {
    <PERIOD> { pos = getPos(); } <FOR> <SYSTEM_TIME> { return true; }
    |
    { return false; }
    }
  • TableProperties
    解析出WITH参数列表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    /**
    * Parse a table properties.
    */
    SqlNodeList TableProperties():
    {
    SqlNode property;
    final List<SqlNode> proList = new ArrayList<SqlNode>();
    final Span span;
    }
    {
    <LPAREN> { span = span(); }
    [
    property = TableOption()
    {
    proList.add(property);
    }
    (
    <COMMA> property = TableOption()
    {
    proList.add(property);
    }
    )*
    ]
    <RPAREN>
    { return new SqlNodeList(proList, span.end(this)); }
    }

    SqlNode TableOption() :
    {
    SqlIdentifier key;
    SqlNode value;
    SqlParserPos pos;
    }
    {
    key = CompoundIdentifier()
    { pos = getPos(); }
    <EQ> value = StringLiteral()
    {
    return new SqlTableOption(key, value, getPos());
    }
    }
自定义字段类型
  • FlinkSqlDataTypeSpec
SqlBytesType
1
2
3
4
5
6
SqlIdentifier SqlBytesType() :
{
}
{
<BYTES> { return new SqlBytesType(getPos()); }
}
1
2
3
4
5
6
7
8
9
10
public class SqlBytesType extends SqlIdentifier implements ExtendedSqlType{
public SqlBytesType(SqlParserPos pos) {
super("BYTES", pos);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("BYTES");
}
}
SqlStringType
1
2
3
4
5
6
SqlIdentifier SqlStringType() :
{
}
{
<STRING> { return new SqlStringType(getPos()); }
}
1
2
3
4
5
6
7
8
9
10
public class SqlStringType extends SqlIdentifier implements ExtendedSqlType {
public SqlStringType(SqlParserPos pos) {
super("STRING", pos);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("STRING");
}
}
SqlArrayType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SqlIdentifier SqlArrayType() :
{
SqlParserPos pos;
SqlDataTypeSpec elementType;
boolean nullable = true;
}
{
<ARRAY> { pos = getPos(); }
<LT>
elementType = FlinkDataType()
<GT>
{
return new SqlArrayType(pos, elementType);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SqlArrayType extends SqlIdentifier implements ExtendedSqlType {

private final SqlDataTypeSpec elementType;

public SqlArrayType(SqlParserPos pos, SqlDataTypeSpec elementType) {
super(SqlTypeName.ARRAY.getName(), pos);
this.elementType = elementType;
}

public SqlDataTypeSpec getElementType() {
return elementType;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("ARRAY<");
ExtendedSqlType.unparseType(this.elementType, writer, leftPrec, rightPrec);
writer.keyword(">");
}
}
SqlMultisetType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SqlIdentifier SqlMultisetType() :
{
SqlParserPos pos;
SqlDataTypeSpec elementType;
boolean nullable = true;
}
{
<MULTISET> { pos = getPos(); }
<LT>
elementType = FlinkDataType()
<GT>
{
return new SqlMultisetType(pos, elementType);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class SqlMultisetType extends SqlIdentifier implements ExtendedSqlType{
private final SqlDataTypeSpec elementType;

public SqlMultisetType(SqlParserPos pos, SqlDataTypeSpec elementType) {
super(SqlTypeName.MULTISET.getName(), pos);
this.elementType = elementType;
}

public SqlDataTypeSpec getElementType() {
return elementType;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("MULTISET<");
ExtendedSqlType.unparseType(this.elementType, writer, leftPrec, rightPrec);
writer.keyword(">");
}
}
SqlMapType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SqlIdentifier SqlMapType() :
{
SqlDataTypeSpec keyType;
SqlDataTypeSpec valType;
boolean nullable = true;
}
{
<MAP>
<LT>
keyType = FlinkDataType()
<COMMA>
valType = FlinkDataType()
<GT>
{
return new SqlMapType(getPos(), keyType, valType);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SqlMapType extends SqlIdentifier implements ExtendedSqlType{
private final SqlDataTypeSpec keyType;
private final SqlDataTypeSpec valType;

public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, SqlDataTypeSpec valType) {
super(SqlTypeName.MAP.getName(), pos);
this.keyType = keyType;
this.valType = valType;
}

public SqlDataTypeSpec getKeyType() {
return keyType;
}

public SqlDataTypeSpec getValType() {
return valType;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("MAP");
SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
writer.sep(",");
ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec);
writer.sep(",");
ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec);
writer.endList(frame);
}
}
SqlRowType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* Parse Row type, we support both Row(name1 type1, name2 type2) and Row<name1 type1, name2 type2>.
* Every item type can have suffix of `NULL` or `NOT NULL` to indicate if this type is nullable.
* i.e. Row(f0 int not null, f1 varchar null).
*/
SqlIdentifier SqlRowType() :
{
List<SqlIdentifier> fieldNames = new ArrayList<SqlIdentifier>();
List<SqlDataTypeSpec> fieldTypes = new ArrayList<SqlDataTypeSpec>();
List<SqlCharStringLiteral> comments = new ArrayList<SqlCharStringLiteral>();
}
{
<ROW>
(
<NE>
|
<LT> FieldNameTypeCommaList(fieldNames, fieldTypes, comments) <GT>
|
<LPAREN> FieldNameTypeCommaList(fieldNames, fieldTypes, comments) <RPAREN>
)
{
return new SqlRowType(getPos(), fieldNames, fieldTypes, comments);
}
}

/**
* Parse a "name1 type1 ['i'm a comment'], name2 type2 ..." list.
*/
void FieldNameTypeCommaList(
List<SqlIdentifier> fieldNames,
List<SqlDataTypeSpec> fieldTypes,
List<SqlCharStringLiteral> comments) :
{
SqlIdentifier fName;
SqlDataTypeSpec fType;
}
{
[
fName = SimpleIdentifier()
fType = FlinkDataType()
{
fieldNames.add(fName);
fieldTypes.add(fType);
}
(
<QUOTED_STRING> {
String p = SqlParserUtil.parseString(token.image);
comments.add(SqlLiteral.createCharString(p, getPos()));
}
|
{ comments.add(null); }
)
]
(
<COMMA>
fName = SimpleIdentifier()
fType = FlinkDataType()
{
fieldNames.add(fName);
fieldTypes.add(fType);
}
(
<QUOTED_STRING> {
String p = SqlParserUtil.parseString(token.image);
comments.add(SqlLiteral.createCharString(p, getPos()));
}
|
{ comments.add(null); }
)
)*
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Data
public class SqlRowType extends SqlIdentifier implements ExtendedSqlType{
private final List<SqlIdentifier> fieldNames;
private final List<SqlDataTypeSpec> fieldTypes;
private final List<SqlCharStringLiteral> comments;

public SqlRowType(SqlParserPos pos,
List<SqlIdentifier> fieldNames,
List<SqlDataTypeSpec> fieldTypes,
List<SqlCharStringLiteral> comments) {
super(SqlTypeName.ROW.getName(), pos);
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
this.comments = comments;
}

public int getArity() {
return fieldNames.size();
}

public SqlIdentifier getFieldName(int i) {
return fieldNames.get(i);
}

public SqlDataTypeSpec getFieldType(int i) {
return fieldTypes.get(i);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.print("ROW");
if (getFieldNames().size() == 0) {
writer.print("<>");
} else {
SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
int i = 0;
for (Pair<SqlIdentifier, SqlDataTypeSpec> p : Pair.zip(this.fieldNames, this.fieldTypes)) {
writer.sep(",", false);
p.left.unparse(writer, 0, 0);
ExtendedSqlType.unparseType(p.right, writer, leftPrec, rightPrec);
if (comments.get(i) != null) {
comments.get(i).unparse(writer, leftPrec, rightPrec);
}
i += 1;
}
writer.endList(frame);
}
}
}
SqlTimeType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SqlIdentifier SqlTimeType() :
{
int precision = -1;
boolean withLocalTimeZone = false;
}
{
<TIME>
(
<LPAREN> precision = UnsignedIntLiteral() <RPAREN>
|
{ precision = -1; }
)
withLocalTimeZone = WithLocalTimeZone()
{ return new SqlTimeType(getPos(), precision, withLocalTimeZone); }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class SqlTimeType extends SqlIdentifier implements ExtendedSqlType {
private final int precision;
private final boolean withLocalTimeZone;

public SqlTimeType(SqlParserPos pos, int precision, boolean withLocalTimeZone) {
super(getTypeName(withLocalTimeZone), pos);
this.precision = precision;
this.withLocalTimeZone = withLocalTimeZone;
}

private static String getTypeName(boolean withLocalTimeZone) {
if (withLocalTimeZone) {
return SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE.name();
} else {
return SqlTypeName.TIME.name();
}
}

public SqlTypeName getSqlTypeName() {
if (withLocalTimeZone) {
return SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE;
} else {
return SqlTypeName.TIME;
}
}

public int getPrecision() {
return this.precision;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword(SqlTypeName.TIME.name());
if (this.precision >= 0) {
final SqlWriter.Frame frame =
writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")");
writer.print(precision);
writer.endList(frame);
}
if (this.withLocalTimeZone) {
writer.keyword("WITH LOCAL TIME ZONE");
}
}
}
SqlTimestampType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SqlIdentifier SqlTimestampType() :
{
int precision = -1;
boolean withLocalTimeZone = false;
}
{
<TIMESTAMP>
(
<LPAREN> precision = UnsignedIntLiteral() <RPAREN>
|
{ precision = -1; }
)
withLocalTimeZone = WithLocalTimeZone()
{ return new SqlTimestampType(getPos(), precision, withLocalTimeZone); }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class SqlTimestampType extends SqlIdentifier implements ExtendedSqlType {
private final int precision;
private final boolean withLocalTimeZone;

public SqlTimestampType(SqlParserPos pos, int precision, boolean withLocalTimeZone) {
super(getTypeName(withLocalTimeZone), pos);
this.precision = precision;
this.withLocalTimeZone = withLocalTimeZone;
}

private static String getTypeName(boolean withLocalTimeZone) {
if (withLocalTimeZone) {
return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE.name();
} else {
return SqlTypeName.TIMESTAMP.name();
}
}

public SqlTypeName getSqlTypeName() {
if (withLocalTimeZone) {
return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
} else {
return SqlTypeName.TIMESTAMP;
}
}

public int getPrecision() {
return this.precision;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword(SqlTypeName.TIMESTAMP.name());
if (this.precision >= 0) {
final SqlWriter.Frame frame =
writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")");
writer.print(precision);
writer.endList(frame);
}
if (this.withLocalTimeZone) {
writer.keyword("WITH LOCAL TIME ZONE");
}
}
}
测试使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@Test
public void createKafkaSourceWithComputedColumnForWatermark() {
String sql = "CREATE TABLE table_source(\n" +
" channel varchar, -- 频道\n" +
" pv int, -- 点击次数\n" +
" xctime varchar, -- yyyyMMddHHmmss格式时间戳,字符串类型\n" +
" ts AS TO_TIMESTAMP(xctime,'yyyyMMddHHmmss'), -- rowtime,必须为TIMESTAMP类型\n" +
" WATERMARK FOR ts AS withOffset( ts , '120000' ) --watermark计算方法,允许2分钟的乱序时间,即允许数据迟到2分钟\n" +
" )WITH(\n" +
" type='kafka11',\n" +
" kafka.bootstrap.servers='mwt:9092',\n" +
" kafka.auto.offset.reset='latest',\n" +
" kafka.kerberos.enabled='false',\n" +
" kafka.data.type='json',\n" +
" kafka.topic='table_source',\n" +
" parallelism='1'\n" +
" )\n";

// 创建解析器
SqlParser sqlParser = SqlParser.create(sql,
// 解析配置
SqlParser.configBuilder()
// 设置解析工厂
.setParserFactory(SqlParserImpl.FACTORY)
.setQuoting(Quoting.BACK_TICK)
.setUnquotedCasing(Casing.UNCHANGED)
.setQuotedCasing(Casing.UNCHANGED)
.setConformance(conformance)
.build());

final SqlNode sqlNode;
try {
// 解析sql语句
sqlNode = sqlParser.parseStmt();
} catch (SqlParseException e) {
throw new RuntimeException("Error while parsing SQL: " + sql, e);
}

// 将SqlNode直接转换成相应的解析结果类
assert sqlNode instanceof SqlCreateTable;
final SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode;

SqlIdentifier tableName = sqlCreateTable.getTableName();
LOG.debug("tableName: {}", tableName);

SqlNodeList columns = sqlCreateTable.getColumnList();
LOG.debug("columns: {}", columns);

// set with properties
SqlNodeList propertyList = sqlCreateTable.getPropertyList();
Map<String, String> properties = new HashMap<>();
if (propertyList != null) {
propertyList.getList().forEach(p ->
properties.put(((SqlTableOption) p).getKeyString().toLowerCase(),
((SqlTableOption) p).getValueString()));
}
LOG.debug("properties: {}", properties);

LOG.debug("eventTimeField:{}", sqlCreateTable.getEventTimeField());

LOG.debug("maxOutOrderless:{}", sqlCreateTable.getMaxOutOrderless());

String nodeInfo = sqlNode.toString();
LOG.debug("test allows secondary parsing: {}", nodeInfo);
}

CREATE VIEW 和 CREATE FUNCTION 操作类似,具体参考示例代码。

CREATE VIEW

SQL语法
1
2
3
4
5
6
CREATE VIEW table_sink_view AS
SELECT
a.*,b.info
FROM
table_source a
JOIN table_side b ON a.name=b.name;
javacc语法模板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Parses a create view or replace existing view statement.
* CREATE [OR REPLACE] VIEW view_name [ (field1, field2 ...) ] AS select_statement
*/
SqlCreate SqlCreateView(Span s, boolean replace) : {
SqlIdentifier viewName;
SqlCharStringLiteral comment = null;
SqlNode query;
SqlNodeList fieldList = SqlNodeList.EMPTY;
}
{
<VIEW>
viewName = CompoundIdentifier()
[
fieldList = ParenthesizedSimpleIdentifierList()
]
[ <COMMENT> <QUOTED_STRING> {
String p = SqlParserUtil.parseString(token.image);
comment = SqlLiteral.createCharString(p, getPos());
}
]
<AS>
query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
{
return new SqlCreateView(s.pos(), viewName, fieldList, query, replace, comment);
}
}

CREATE FUNCTION

SQL语法
1
CREATE FUNCTION StringLengthUdf AS 'com.dtwave.example.flink.udx.udf.StringLengthUdf';
javacc语法模板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 创建函数
*/
SqlCreateFunction SqlCreateFunction(Span s, boolean replace):
{
SqlParserPos pos;
SqlIdentifier functionName;
SqlNode className;
}
{
{ pos = getPos(); }
<FUNCTION> functionName = CompoundIdentifier()
<AS> className = StringLiteral()
{
return new SqlCreateFunction(s.pos(),functionName,className);
}
}

DML

RichInsert

SQL语法
1
2
3
4
5
6
7
8
9
10
11
INSERT INTO table_sink
SELECT
a.name,
a.channel,
a.pv,
a.xctime,
StringLengthUdf(a.channel),
b.info
FROM table_source a
JOIN table_side b ON a.name = b.name
WHERE a.channel='channel1' AND a.pv>0
定义解析结果类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public class RichSqlInsert extends SqlInsert implements ExtendedSqlNode {
private final SqlNodeList staticPartitions;

private final SqlNodeList extendedKeywords;

public RichSqlInsert(SqlParserPos pos,
SqlNodeList keywords,
SqlNodeList extendedKeywords,
SqlNode targetTable,
SqlNode source,
SqlNodeList columnList,
SqlNodeList staticPartitions) {
super(pos, keywords, targetTable, source, columnList);
this.extendedKeywords = extendedKeywords;
this.staticPartitions = staticPartitions;
}

/**
* @return the list of partition key-value pairs,
* returns empty if there is no partition specifications.
*/
public SqlNodeList getStaticPartitions() {
return staticPartitions;
}

/**
* Get static partition key value pair as strings.
*
* <p>Caution that we use {@link SqlLiteral#toString()} to get
* the string format of the value literal. If the string format is not
* what you need, use {@link #getStaticPartitions()}.
*
* @return the mapping of column names to values of partition specifications,
* returns an empty map if there is no partition specifications.
*/
public LinkedHashMap<String, String> getStaticPartitionKVs() {
LinkedHashMap<String, String> ret = new LinkedHashMap<>();
if (this.staticPartitions.size() == 0) {
return ret;
}
for (SqlNode node : this.staticPartitions.getList()) {
SqlProperty sqlProperty = (SqlProperty) node;
String value = SqlLiteral.value(sqlProperty.getValue()).toString();
ret.put(sqlProperty.getKey().getSimple(), value);
}
return ret;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.startList(SqlWriter.FrameTypeEnum.SELECT);
String insertKeyword = "INSERT INTO";
if (isUpsert()) {
insertKeyword = "UPSERT INTO";
} else if (isOverwrite()) {
insertKeyword = "INSERT OVERWRITE";
}
writer.sep(insertKeyword);
final int opLeft = getOperator().getLeftPrec();
final int opRight = getOperator().getRightPrec();
getTargetTable().unparse(writer, opLeft, opRight);
if (getTargetColumnList() != null) {
getTargetColumnList().unparse(writer, opLeft, opRight);
}
writer.newlineAndIndent();
if (staticPartitions != null && staticPartitions.size() > 0) {
writer.keyword("PARTITION");
staticPartitions.unparse(writer, opLeft, opRight);
writer.newlineAndIndent();
}
getSource().unparse(writer, 0, 0);
}

//~ Tools ------------------------------------------------------------------

public static boolean isUpsert(List<SqlLiteral> keywords) {
for (SqlNode keyword : keywords) {
SqlInsertKeyword keyword2 =
((SqlLiteral) keyword).symbolValue(SqlInsertKeyword.class);
if (keyword2 == SqlInsertKeyword.UPSERT) {
return true;
}
}
return false;
}

/**
* Returns whether the insert mode is overwrite (for whole table or for specific partitions).
*
* @return true if this is overwrite mode
*/
public boolean isOverwrite() {
return getModifierNode(RichSqlInsertKeyword.OVERWRITE) != null;
}

private SqlNode getModifierNode(RichSqlInsertKeyword modifier) {
for (SqlNode keyword : extendedKeywords) {
RichSqlInsertKeyword keyword2 =
((SqlLiteral) keyword).symbolValue(RichSqlInsertKeyword.class);
if (keyword2 == modifier) {
return keyword;
}
}
return null;
}

@Override
public void validate() throws SqlParseException {
// no-op
}
}
javacc语法模板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* Parses an INSERT statement.
*/
SqlNode RichSqlInsert() :
{
final List<SqlLiteral> keywords = new ArrayList<SqlLiteral>();
final SqlNodeList keywordList;
final List<SqlLiteral> extendedKeywords = new ArrayList<SqlLiteral>();
final SqlNodeList extendedKeywordList;
SqlNode table;
SqlNodeList extendList = null;
SqlNode source;
final SqlNodeList partitionList = new SqlNodeList(getPos());
SqlNodeList columnList = null;
final Span s;
}
{
(
<INSERT>
|
<UPSERT> { keywords.add(SqlInsertKeyword.UPSERT.symbol(getPos())); }
)
(
<INTO>
|
<OVERWRITE> {
if (!((FlinkSqlConformance) this.conformance).allowInsertOverwrite()) {
throw new ParseException("OVERWRITE expression is only allowed for HIVE dialect");
} else if (RichSqlInsert.isUpsert(keywords)) {
throw new ParseException("OVERWRITE expression is only used with INSERT mode");
}
extendedKeywords.add(RichSqlInsertKeyword.OVERWRITE.symbol(getPos()));
}
)
{ s = span(); }
SqlInsertKeywords(keywords) {
keywordList = new SqlNodeList(keywords, s.addAll(keywords).pos());
extendedKeywordList = new SqlNodeList(extendedKeywords, s.addAll(extendedKeywords).pos());
}
table = CompoundIdentifier()
[
LOOKAHEAD(5)
[ <EXTEND> ]
extendList = ExtendList() {
table = extend(table, extendList);
}
]
[
LOOKAHEAD(2)
{ final Pair<SqlNodeList, SqlNodeList> p; }
p = ParenthesizedCompoundIdentifierList() {
if (p.right.size() > 0) {
table = extend(table, p.right);
}
if (p.left.size() > 0) {
columnList = p.left;
}
}
]
source = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) {
return new RichSqlInsert(s.end(source), keywordList, extendedKeywordList, table, source,
columnList, partitionList);
}
}
测试使用

生成统一的Operation操作
SqlToOperationConverter.java