Apache Calcite 是一个动态数据的管理框架,可以用来构建数据库系统的语法解析模块,不包含数据存储、数据处理等功能。
Calcite 只是对各种数据库(不同的数据源)的查询进行了封装,并对外提供了统一的查询入口。可以将 Calcite 理解成一个不包含存储层的数据库,它不需要关心任何文件格式。
编译Calcite源码
1 | git clone https://github.com/apache/calcite.git |
csv 示例
1 | cd example/csv |
model.json
使用sqlline来连接Calcite,这个工程里面包含了SQL shell
脚本:
1 | ./sqlline |
target/test-classes/model.json的内容如下:
1 | { |
target/test-classes/sales目录下包含的文件将会被转换成3张表:
1 | DEPTS.csv |
执行元数据查询:
1 | 0: jdbc:calcite:model=target/test-classes/mod> !tables |
查询emps表的数据:
1 | 0: jdbc:calcite:model=target/test-classes/mod> SELECT * FROM emps; |
加入JOIN和GROUP BY操作:
1 | 0: jdbc:calcite:model=target/test-classes/mod> SELECT d.name, COUNT(*) |
VALUES操作能够生成单独的一行,可以方便的用来测试表达式和内置的SQL函数:
1 | 0: jdbc:calcite:model=target/test-classes/mod> VALUES CHAR_LENGTH('Hello, ' || 'world!'); |
model-with-view.json
1 | ./sqlline |
target/test-classes/model-with-view.json的内容如下:
1 | { |
执行元数据查询,多了一个FEMALE_EMPS视图:
1 | 0: jdbc:calcite:model=target/test-classes/mod> !tables |
查询FEMALE_EMPS视图,就像查询表一样查询它:
1 | 0: jdbc:calcite:model=target/test-classes/mod> SELECT e.name, d.name FROM female_emps AS e JOIN depts AS d on e.deptno = d.deptno; |
model-with-custom-table.json
自定义表由用户自定义的代码来实现。
1 | ./sqlline |
target/test-classes/model-with-custom-table.json的内容如下:
1 | { |
执行元数据查询:
1 | 1: jdbc:calcite:model=target/test-classes/mod> !tables |
查询自定义表emps的数据:
1 | 1: jdbc:calcite:model=target/test-classes/mod> SELECT empno, name FROM custom_table.emps; |
explain:
1 | sqlline> !connect jdbc:calcite:model=target/test-classes/model.json admin admin |
target/test-classes/smart.json的内容如下:
1 | { |
代数
// TODO 未完待续
适配器
// TODO 未完待续
流
Calcite 扩展了 SQL 和关系代数,以支持流式查询。流是连续的永远的流记录的集合。与表不同,它们通常不存储在磁盘上,而是通过网络流动,在内存中保存较短时间。
流补充了表,流表示当前和未来的情况,表则表示过去。与表一样,通常希望使用基于关系代数的高级语言查询流,根据模式进行验证,并优化以利用可用的资源和算法。
流表 JOIN
如果表的内容没有发生变化,那么流到表的连接很简单,这个查询用每个产品的单价丰富了订单流,也就是通常所说的生成宽表。
1 | SELECT STREAM o.rowtime, o.productId, o.orderId, o.units, |
但是如果表发生变化,例如,假设产品10的单价在11点增加到0.35。11点之前的订单应该是旧价格,11点之后的订单应该是新价格。
实现这一功能的一种方法是创建一个表,其中包含每个版本的开始和结束生效日期。即对维表进行版本控制。
1 | SELECT STREAM * |
流流 JOIN
如果连接条件以某种方式强制两个流之间保持有限距离,那么连接两个流是有意义的。
在以下查询中,发货日期为订单日期后1小时内:
1 | SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime |
相当多的订单没有出现,因为它们在一个小时之内没有发货。当系统接收到时间戳为11:24:11的订单10时,它已经从散列表中删除了包括时间戳为10:18:07的订单8在内的订单。
流式SQL
sql-window
流SQL中的窗口概念:
- 滚动窗口 Tumbling Window
将元素分配给每个固定长度的窗口,滚动窗口具有固定的尺寸,不重叠元素
- 滑动窗口 Sliding Window
滑动窗口将元素分配给固定长度的窗口,并且附加每次窗口的滑动频率,可以存在窗口重叠的情况
- 会话窗口 Session Window
按照会话元素进行分组,会话窗口不重叠,没有固定的开始时间和结束时间,当一定时间没有接收到新的元素的话,则会话窗口关闭
watermark
Watermark 和 Calcite 基本无关,和流式SQL有关。数据流中经常出现事件时间(Event Time)乱序的情况。
Flink Watermark设计:
- 周期Watermark(Periodic Watermark)
Periodic Watermark 按照固定时间间隔生成新的 watermark,不管是否有新的消息抵达。在两次 watermark 生成时间间隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的watermark。
1 | class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { |
其中extractTimestamp用于从消息中提取事件时间,而getCurrentWatermark用于生成新的水位线,新的水位线只有大于当前水位线才是有效的。每个窗口都会有该类的一个实例,因此可以利用实例的成员变量保存状态,比如上例中的当前最大时间戳。
- 标点Watermark(Punctuated Watermark)
标点水位线(Punctuated Watermark)通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。
1 | class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] { |
其中extractTimestamp用于从消息中提取事件时间,checkAndGetNextWatermark用于检查事件是否标点事件,若是则生成新的水位线。不同于定期水位线定时调用getCurrentWatermark,标点水位线是每接受一个事件就需要调用checkAndGetNextWatermark,若返回值非 null 且新水位线大于当前水位线,则触发窗口计算。
解析器Parser
fmpp
FMPP是以 freemarker 为模板的模板生成器。
添加 Maven 依赖
1 | <plugin> |
fmpp 配置文件
config.fmpp:
1 | # 用data标示为变量 |
freemarker 模板
模板1:
1 | public class Main { |
模板2:
1 | static { |
生成代码文件
执行 maven 插件:
1 | mvn fmpp:generate |
在target/generated-sources/fmpp目录下会生成如下代码文件:
1 | public class Main { |
javacc
使用递归下降语法解析,LL(k)
其中,第一个L表示从左到右扫描输入;
第二个L表示每次都进行最左推导(在推导语法树的过程中每次都替换句型中最左的非终结符为终结符)
k表示每次向前探索(lookahead)k个终结符
语法描述文件
1 | options { |
示例Parser.tdd文件中可以定义如下:
1 | package: "com.matty.flink.sql.parser.impl", |
类介绍
会在 target/generated-sources 的 package 包下生成如下几个类:
- XXX
解析类入口,如上面指定的SqlParserImpl
- SimpleCharStream
SimpleCharStream jj_input_stream;
词法分析器的输入流
1 | // 构造函数种类 ,可以接受Reader和InputStream |
- XXXConstants
Token常量,包括 SKIP TOKEN 和 TOKEN
示例Parser.tdd文件中可以定义如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// 忽略的字符
SKIP:{
" "
}
// 关键字
TOKEN:{
<PLUS :"+">
}
// 新增的关键字列表。如果关键字不是一个保留关键字,将其添加到"无保留关键字"部分。
keywords:[
"WATERMARK",
"offset"
]
// "keywords"部分中未保留的关键字列表
nonReservedKeywords:[
"offset"
]
对应生成的java类为:
1 | // 和常量申明对应 |
- XXXTokenManager
词法分析器
1 | // 常见方法说明 |
- Token
1 | public class Token { |
ParseException
语法解析异常TokenMgrError
语法错误提示
语法介绍
java代码
java代码块用{}
声明1
2
3
4
5
6
7
8
9// 定义java代码块
void javaCodeDemo():
{}
{
{
int i = 0;
System.out.println(i);
}
}java函数
需要使用JAVACODE声明1
2
3JAVACODE void print(Token t){
System.out.println(t);
}条件
if语句 []
1
2
3
4
5
6
7
8
9
10
11
12
13
14// if语句
void ifExpr():
{}
{
[
<SELECT>
{
System.out.println("if select");
}
]
// 循环,出现一次
(<SELECT>)?
}if else 语句 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// if - else
void ifElseExpr():
{}
{
(
<SELECT> {System.out.println("if else select");}
|
<UPDATE> {System.out.println("if else update");}
|
<DELETE> {System.out.println("if else delete");}
|
{
System.out.println("other");
}
)
}循环
while 0~n
1
2
3
4
5
6// while 0~n
void while1Expr():{
}
{
(<SELECT>)*
}while 1~n
1
2
3
4
5
6// while 1~n
void while2Expr():{
}
{
(<SELECT>)+
}
- 正则表达式
[]
: 内容可选+
: 内容出现1次或多次*
: 内容出现0次或多次?
: 内容出现0次或1次|
: 或()
: 优先级改变或整体操作
sql优化
优化过程一般为以下过程:
- 对SQL进行词法分析得到一个语法树
- 根据关系代数进行SQL的逻辑优化
- 根据代价估算算法进行物理查询的优化
- 执行器执行
sql执行
根据不同的Node定义了代码的实现方法,从最底层的RelNode依次执行,采用source接收数据,sink发送数据。