Apache Calcite

Apache Calcite 是一个动态数据的管理框架,可以用来构建数据库系统的语法解析模块,不包含数据存储、数据处理等功能。
Calcite 只是对各种数据库(不同的数据源)的查询进行了封装,并对外提供了统一的查询入口。可以将 Calcite 理解成一个不包含存储层的数据库,它不需要关心任何文件格式。

编译Calcite源码

1
2
3
$ git clone https://github.com/apache/calcite.git
$ cd calcite
$ mvn install -DskipTests -Dcheckstyle.skip=true

csv 示例

1
$ cd example/csv

model.json

使用sqlline来连接Calcite,这个工程里面包含了SQL shell脚本:

1
2
./sqlline
sqlline> !connect jdbc:calcite:model=target/test-classes/model.json admin admin

target/test-classes/model.json的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"version": "1.0",
"defaultSchema": "SALES",
"schemas": [
{
"name": "SALES",
"type": "custom",
"factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
"operand": {
"directory": "sales"
}
}
]
}

target/test-classes/sales目录下包含的文件将会被转换成3张表:

1
2
3
DEPTS.csv
EMPS.csv.gz
SDEPTS.csv

执行元数据查询:

1
2
3
4
5
6
7
8
9
10
0: jdbc:calcite:model=target/test-classes/mod> !tables
+-----------+-------------+------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_CAT | TYPE_SCHEM | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION |
+-----------+-------------+------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+
| | SALES | DEPTS | TABLE | | | | | | |
| | SALES | EMPS | TABLE | | | | | | |
| | SALES | SDEPTS | TABLE | | | | | | |
| | metadata | COLUMNS | SYSTEM TABLE | | | | | | |
| | metadata | TABLES | SYSTEM TABLE | | | | | | |
+-----------+-------------+------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+

查询emps表的数据:

1
2
3
4
5
6
7
8
9
10
11
0: jdbc:calcite:model=target/test-classes/mod> SELECT * FROM emps;
+-------+-------+--------+--------+---------------+-------+------+---------+---------+------------+
| EMPNO | NAME | DEPTNO | GENDER | CITY | EMPID | AGE | SLACKER | MANAGER | JOINEDAT |
+-------+-------+--------+--------+---------------+-------+------+---------+---------+------------+
| 100 | Fred | 10 | | | 30 | 25 | true | false | 1996-08-03 |
| 110 | Eric | 20 | M | San Francisco | 3 | 80 | | false | 2001-01-01 |
| 110 | John | 40 | M | Vancouver | 2 | null | false | true | 2002-05-03 |
| 120 | Wilma | 20 | F | | 1 | 5 | | true | 2005-09-07 |
| 130 | Alice | 40 | F | Vancouver | 2 | null | false | true | 2007-01-01 |
+-------+-------+--------+--------+---------------+-------+------+---------+---------+------------+
5 rows selected (1.377 seconds)

加入JOIN和GROUP BY操作:

1
2
3
4
5
6
7
8
9
10
11
0: jdbc:calcite:model=target/test-classes/mod> SELECT d.name, COUNT(*)
. . . . . . . . . . . . . . . . . . semicolon> FROM emps AS e
. . . . . . . . . . . . . . . . . . semicolon> JOIN depts AS d ON e.deptno = d.deptno
. . . . . . . . . . . . . . . . . . semicolon> GROUP BY d.name;
+-----------+--------+
| NAME | EXPR$1 |
+-----------+--------+
| Sales | 1 |
| Marketing | 2 |
+-----------+--------+
2 rows selected (0.353 seconds)

VALUES操作能够生成单独的一行,可以方便的用来测试表达式和内置的SQL函数:

1
2
3
4
5
6
7
0: jdbc:calcite:model=target/test-classes/mod> VALUES CHAR_LENGTH('Hello, ' || 'world!');
+--------+
| EXPR$0 |
+--------+
| 13 |
+--------+
1 row selected (0.09 seconds)

model-with-view.json

1
2
./sqlline
sqlline> !connect jdbc:calcite:model=target/test-classes/model-with-view.json admin admin

target/test-classes/model-with-view.json的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"version": "1.0",
"defaultSchema": "SALES",
"schemas": [
{
"name": "SALES",
"type": "custom",
"factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
"operand": {
"directory": "sales"
},
"tables": [
{
"name": "FEMALE_EMPS",
"type": "view",
"sql": "SELECT * FROM emps WHERE gender = 'F'"
}
]
}
]
}

执行元数据查询,多了一个FEMALE_EMPS视图:

1
2
3
4
5
6
7
8
9
10
11
0: jdbc:calcite:model=target/test-classes/mod> !tables
+-----------+-------------+-------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_CAT | TYPE_SCHEM | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION |
+-----------+-------------+-------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+
| | SALES | DEPTS | TABLE | | | | | | |
| | SALES | EMPS | TABLE | | | | | | |
| | SALES | SDEPTS | TABLE | | | | | | |
| | SALES | FEMALE_EMPS | VIEW | | | | | | |
| | metadata | COLUMNS | SYSTEM TABLE | | | | | | |
| | metadata | TABLES | SYSTEM TABLE | | | | | | |
+-----------+-------------+-------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+

查询FEMALE_EMPS视图,就像查询表一样查询它:

1
2
3
4
5
6
7
0: jdbc:calcite:model=target/test-classes/mod> SELECT e.name, d.name FROM female_emps AS e JOIN depts AS d on e.deptno = d.deptno;
+-------+-----------+
| NAME | NAME |
+-------+-----------+
| Wilma | Marketing |
+-------+-----------+
1 row selected (0.945 seconds)

model-with-custom-table.json

自定义表由用户自定义的代码来实现。

1
2
./sqlline
sqlline> !connect jdbc:calcite:model=target/test-classes/model-with-custom-table.json admin admin

target/test-classes/model-with-custom-table.json的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"version": "1.0",
"defaultSchema": "CUSTOM_TABLE",
"schemas": [
{
"name": "CUSTOM_TABLE",
"tables": [
{
"name": "EMPS",
"type": "custom",
"factory": "org.apache.calcite.adapter.csv.CsvTableFactory",
"operand": {
"file": "sales/EMPS.csv.gz",
"flavor": "scannable"
}
}
]
}
]
}

执行元数据查询:

1
2
3
4
5
6
7
8
1: jdbc:calcite:model=target/test-classes/mod> !tables
+-----------+--------------+------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_CAT | TYPE_SCHEM | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION |
+-----------+--------------+------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+
| | CUSTOM_TABLE | EMPS | TABLE | | | | | | |
| | metadata | COLUMNS | SYSTEM TABLE | | | | | | |
| | metadata | TABLES | SYSTEM TABLE | | | | | | |
+-----------+--------------+------------+--------------+---------+----------+------------+-----------+---------------------------+----------------+

查询自定义表emps的数据:

1
2
3
4
5
6
7
8
9
10
11
1: jdbc:calcite:model=target/test-classes/mod> SELECT empno, name FROM custom_table.emps;
+-------+-------+
| EMPNO | NAME |
+-------+-------+
| 100 | Fred |
| 110 | Eric |
| 110 | John |
| 120 | Wilma |
| 130 | Alice |
+-------+-------+
5 rows selected (0.049 seconds)

explain:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sqlline> !connect jdbc:calcite:model=target/test-classes/model.json admin admin
0: jdbc:calcite:model=target/test-classes/mod> explain plan for select name from emps;
+------------------------------------------------------------------------------------------------------------------------+
| PLAN |
+------------------------------------------------------------------------------------------------------------------------+
| EnumerableCalc(expr#0..9=[{inputs}], NAME=[$t1])
EnumerableInterpreter
BindableTableScan(table=[[SALES, EMPS]])
|
+------------------------------------------------------------------------------------------------------------------------+
1 row selected (0.759 seconds)
0: jdbc:calcite:model=target/test-classes/mod> !connect jdbc:calcite:model=target/test-classes/smart.json admin admin
1: jdbc:calcite:model=target/test-classes/sma> explain plan for select name from emps;
+----------------------------------------------------+
| PLAN |
+----------------------------------------------------+
| CsvTableScan(table=[[SALES, EMPS]], fields=[[1]])
|
+----------------------------------------------------+
1 row selected (0.059 seconds)

target/test-classes/smart.json的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"version": "1.0",
"defaultSchema": "SALES",
"schemas": [
{
"name": "SALES",
"type": "custom",
"factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
"operand": {
"directory": "sales",
// 这一行会创建一个 CsvSchema,并且它的 createTable 方法创建的是 CsvTranslatableTable ,而不是 CsvScannableTable
"flavor": "TRANSLATABLE"
}
}
]
}

代数

// TODO 未完待续

适配器

// TODO 未完待续

Calcite 扩展了 SQL 和关系代数,以支持流式查询。流是连续的永远的流记录的集合。与表不同,它们通常不存储在磁盘上,而是通过网络流动,在内存中保存较短时间。
流补充了表,流表示当前和未来的情况,表则表示过去。与表一样,通常希望使用基于关系代数的高级语言查询流,根据模式进行验证,并优化以利用可用的资源和算法。

流表 JOIN

如果表的内容没有发生变化,那么流到表的连接很简单,这个查询用每个产品的单价丰富了订单流,也就是通常所说的生成宽表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT STREAM o.rowtime, o.productId, o.orderId, o.units,
p.name, p.unitPrice
FROM Orders AS o
JOIN Products AS p
ON o.productId = p.productId;

rowtime | productId | orderId | units | name | unitPrice
----------+-----------+---------+-------+ -------+-----------
10:17:00 | 30 | 5 | 4 | Cheese | 17
10:17:05 | 10 | 6 | 1 | Beer | 0.25
10:18:05 | 20 | 7 | 2 | Wine | 6
10:18:07 | 30 | 8 | 20 | Cheese | 17
11:02:00 | 10 | 9 | 6 | Beer | 0.25
11:04:00 | 10 | 10 | 1 | Beer | 0.25
11:09:30 | 40 | 11 | 12 | Bread | 100
11:24:11 | 10 | 12 | 4 | Beer | 0.25

但是如果表发生变化,例如,假设产品10的单价在11点增加到0.35。11点之前的订单应该是旧价格,11点之后的订单应该是新价格。
实现这一功能的一种方法是创建一个表,其中包含每个版本的开始和结束生效日期。即对维表进行版本控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT STREAM *
FROM Orders AS o
JOIN ProductVersions AS p
ON o.productId = p.productId
AND o.rowtime BETWEEN p.startDate AND p.endDate

rowtime | productId | orderId | units | productId1 | name | unitPrice
----------+-----------+---------+-------+ -----------+--------+-----------
10:17:00 | 30 | 5 | 4 | 30 | Cheese | 17
10:17:05 | 10 | 6 | 1 | 10 | Beer | 0.25
10:18:05 | 20 | 7 | 2 | 20 | Wine | 6
10:18:07 | 30 | 8 | 20 | 30 | Cheese | 17
11:02:00 | 10 | 9 | 6 | 10 | Beer | 0.35
11:04:00 | 10 | 10 | 1 | 10 | Beer | 0.35
11:09:30 | 40 | 11 | 12 | 40 | Bread | 100
11:24:11 | 10 | 12 | 4 | 10 | Beer | 0.35

流流 JOIN

如果连接条件以某种方式强制两个流之间保持有限距离,那么连接两个流是有意义的。
在以下查询中,发货日期为订单日期后1小时内:

1
2
3
4
5
6
7
8
9
10
11
12
SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime
FROM Orders AS o
JOIN Shipments AS s
ON o.orderId = s.orderId
AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' HOUR;

rowtime | productId | orderId | shipTime
----------+-----------+---------+----------
10:17:00 | 30 | 5 | 10:55:00
10:17:05 | 10 | 6 | 10:20:00
11:02:00 | 10 | 9 | 11:58:00
11:24:11 | 10 | 12 | 11:44:00

相当多的订单没有出现,因为它们在一个小时之内没有发货。当系统接收到时间戳为11:24:11的订单10时,它已经从散列表中删除了包括时间戳为10:18:07的订单8在内的订单。

流式SQL

sql-window

流SQL中的窗口概念:

  1. 滚动窗口 Tumbling Window

将元素分配给每个固定长度的窗口,滚动窗口具有固定的尺寸,不重叠元素
avatar

  1. 滑动窗口 Sliding Window

滑动窗口将元素分配给固定长度的窗口,并且附加每次窗口的滑动频率,可以存在窗口重叠的情况
avatar

  1. 会话窗口 Session Window

按照会话元素进行分组,会话窗口不重叠,没有固定的开始时间和结束时间,当一定时间没有接收到新的元素的话,则会话窗口关闭
avatar

watermark

Watermark 和 Calcite 基本无关,和流式SQL有关。数据流中经常出现事件时间(Event Time)乱序的情况。

Flink Watermark设计:

  1. 周期Watermark(Periodic Watermark)

Periodic Watermark 按照固定时间间隔生成新的 watermark,不管是否有新的消息抵达。在两次 watermark 生成时间间隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的watermark。

1
2
3
4
5
6
7
8
9
10
11
12
13
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L; // 3.5 seconds
var currentMaxTimestamp: Long;
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp;
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}

其中extractTimestamp用于从消息中提取事件时间,而getCurrentWatermark用于生成新的水位线,新的水位线只有大于当前水位线才是有效的。每个窗口都会有该类的一个实例,因此可以利用实例的成员变量保存状态,比如上例中的当前最大时间戳。

  1. 标点Watermark(Punctuated Watermark)

标点水位线(Punctuated Watermark)通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。

1
2
3
4
5
6
7
8
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
if (element.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}
}

其中extractTimestamp用于从消息中提取事件时间,checkAndGetNextWatermark用于检查事件是否标点事件,若是则生成新的水位线。不同于定期水位线定时调用getCurrentWatermark,标点水位线是每接受一个事件就需要调用checkAndGetNextWatermark,若返回值非 null 且新水位线大于当前水位线,则触发窗口计算。

解析器Parser

fmpp

FMPP是以 freemarker 为模板的模板生成器。

添加 Maven 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<plugin>
<configuration>
<!--配置文件地址-->
<cfgFile>src/main/codegen/config.fmpp</cfgFile>
<!--文件输出目录-->
<outputDirectory>target/generated-sources/fmpp/</outputDirectory>
<!--文件模板存放目录-->
<templateDirectory>src/main/codegen/templates</templateDirectory>
</configuration>
<groupId>com.googlecode.fmpp-maven-plugin</groupId>
<artifactId>fmpp-maven-plugin</artifactId>
<version>1.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.28</version>
</dependency>
<dependency>
<groupId>net.sourceforge.fmpp</groupId>
<artifactId>fmpp</artifactId>
<version>0.9.16</version>
<exclusions>
<exclusion>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</plugin>

fmpp 配置文件

config.fmpp:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 用data标示为变量
# 一般变量替换为 ${one} or ${two.three} ,具体语法请参考freemarker语法
# 也可以引用一个外部的tdd文件,
# include 指令插入另外的freemarker模板
data: {
one:1,
two: {
three: 3
}
implementationFiles: [
"parserImpls.ftl"
]
}

#
freemarkerLinks: {
includes: includes/
}

freemarker 模板

模板1:

1
2
3
4
5
6
7
8
9
10
11
public class Main {
public static void main(String[] args){
System.out.println(${one} + ${two.three});
}
/**
* 额外附加代码
*/
<#list implementationFiles as file>
<#include "/@includes/"+file />
</#list>
}

模板2:

1
2
3
4
static {
System.out.println(${one});
System.out.println(${two.three});
}

生成代码文件

执行 maven 插件:

1
mvn fmpp:generate

在target/generated-sources/fmpp目录下会生成如下代码文件:

1
2
3
4
5
6
7
8
9
10
11
12
public class Main {
public static void main(String[] args){
System.out.println(1 + 3);
}

/**
* 额外附加代码
*/
static {
System.out.println(1);
System.out.println(3);
}}

javacc

使用递归下降语法解析,LL(k)
其中,第一个L表示从左到右扫描输入;
第二个L表示每次都进行最左推导(在推导语法树的过程中每次都替换句型中最左的非终结符为终结符)
k表示每次向前探索(lookahead)k个终结符

语法描述文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
options {
javacc的选项
}

PARSER_BEGIN(解析器类名)
package 包名;
import 库名;

public class 解析器类名 {
任意的java代码
}
PARSER_END(解析器类名)

扫描器的描述

解析器的描述

示例Parser.tdd文件中可以定义如下:

1
2
3
4
5
6
7
8
9
10
11
package: "com.matty.flink.sql.parser.impl",
class: "SqlParserImpl",
imports: [
"com.matty.flink.sql.parser.ddl.SqlCreateTable",
"com.matty.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext",
"com.matty.flink.sql.parser.ddl.SqlTableColumn",
"com.matty.flink.sql.parser.ddl.SqlTableOption",
...
"java.util.List",
"java.util.ArrayList"
]

类介绍

会在 target/generated-sources 的 package 包下生成如下几个类:

  • XXX
    解析类入口,如上面指定的SqlParserImpl
  • SimpleCharStream
    SimpleCharStream jj_input_stream;
    词法分析器的输入流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 构造函数种类 ,可以接受Reader和InputStream
public class SimpleCharStream {

public SimpleCharStream(java.io.Reader dstream, int startline,
int startcolumn, int buffersize);

public SimpleCharStream(java.io.Reader dstream,
int startline, int startcolumn);

public SimpleCharStream(java.io.Reader dstream);

public SimpleCharStream(java.io.InputStream dstream, String encoding, int startline,
int startcolumn, int buffersize) throws java.io.UnsupportedEncodingException;

public SimpleCharStream(java.io.InputStream dstream, int startline,
int startcolumn, int buffersize);

public SimpleCharStream(java.io.InputStream dstream, String encoding, int startline,
int startcolumn) throws java.io.UnsupportedEncodingException;

public SimpleCharStream(java.io.InputStream dstream, int startline,
int startcolumn);

public SimpleCharStream(java.io.InputStream dstream, String encoding) throws java.io.UnsupportedEncodingException;

public SimpleCharStream(java.io.InputStream dstream);
}
  • XXXConstants
    Token常量,包括 SKIP TOKEN 和 TOKEN
    示例Parser.tdd文件中可以定义如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // 忽略的字符
    SKIP:{
    " "
    }

    // 关键字
    TOKEN:{
    <PLUS :"+">
    }

    // 新增的关键字列表。如果关键字不是一个保留关键字,将其添加到"无保留关键字"部分。
    keywords:[
    "WATERMARK",
    "offset"
    ]

    // "keywords"部分中未保留的关键字列表
    nonReservedKeywords:[
    "offset"
    ]

对应生成的java类为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 和常量申明对应
public interface XXXConstants {

int EOF = 0;
int PLUS = 2;
int DEFAULT = 0;
int WATERMARK = 669;
int offset = 678;

String[] tokenImage = {
// EOF 文件结尾
"<EOF>",
// 忽律字符串
"\" \"",
// PLUSA
"\"+\"",
"\"WATERMARK\"",
"\"offset\""
};
}
  • XXXTokenManager
    词法分析器
1
2
3
4
5
6
7
8
9
// 常见方法说明
public class XXXTokenManager implements XXXConstants{
// 输入流
protected SimpleCharStream input_stream;
// 构造函数
public XXXTokenManager(SimpleCharStream stream);
// 获取下一个Token
public Token getNextToken();
}
  • Token
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Token {

// Constants.java的种类
public int kind;

// 开始行和开始列,结束行和结束列
public int beginLine, beginColumn, endLine, endColumn;

// token的字符串
public String image;

// 下一个token
public Token next;

// 特殊令牌
public Token specialToken;

// Returns the image.
public String toString()
{
return image;
}

}
  • ParseException
    语法解析异常

  • TokenMgrError
    语法错误提示

语法介绍

  • java代码
    java代码块用{}声明

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 定义java代码块
    void javaCodeDemo():
    {}
    {
    {
    int i = 0;
    System.out.println(i);
    }
    }
  • java函数
    需要使用JAVACODE声明

    1
    2
    3
    JAVACODE void print(Token t){
    System.out.println(t);
    }
  • 条件

  1. if语句 []

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // if语句
    void ifExpr():
    {}
    {
    [
    <SELECT>
    {
    System.out.println("if select");
    }
    ]

    // 循环,出现一次
    (<SELECT>)?
    }
  2. if else 语句 |

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // if - else
    void ifElseExpr():
    {}
    {
    (
    <SELECT> {System.out.println("if else select");}
    |
    <UPDATE> {System.out.println("if else update");}
    |
    <DELETE> {System.out.println("if else delete");}
    |
    {
    System.out.println("other");
    }
    )
    }
  3. 循环

  • while 0~n

    1
    2
    3
    4
    5
    6
    // while 0~n
    void while1Expr():{
    }
    {
    (<SELECT>)*
    }
  • while 1~n

    1
    2
    3
    4
    5
    6
    // while 1~n
    void while2Expr():{
    }
    {
    (<SELECT>)+
    }
  1. 正则表达式
    []: 内容可选
    +: 内容出现1次或多次
    *: 内容出现0次或多次
    ?: 内容出现0次或1次
    |: 或
    (): 优先级改变或整体操作

sql优化

优化过程一般为以下过程:

  1. 对SQL进行词法分析得到一个语法树
  2. 根据关系代数进行SQL的逻辑优化
  3. 根据代价估算算法进行物理查询的优化
  4. 执行器执行

sql执行

根据不同的Node定义了代码的实现方法,从最底层的RelNode依次执行,采用source接收数据,sink发送数据。

AggregateNode

FilterNode

ProjectNode

SortNode

WindowNode

参考

SQL 解析框架 - Calcite