Flink DQL SQL With Calcite

DESCRIBEEXPLAINSHOW 等 Statement 相关 SQL 解析。

Flink SQL 解析的代码参考:Flink SQL 解析

Parser.tdd 和 parserImpls.ftl 文件中的相关变更如下:

Parser.tdd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
imports: [
...
"com.matty.flink.sql.parser.dql.SqlShowTables",
"com.matty.flink.sql.parser.dql.SqlRichExplain",
"com.matty.flink.sql.parser.dql.SqlRichDescribeTable",
"com.matty.flink.sql.parser.dml.SqlStatementSet",
"java.util.Set"
"java.util.HashSet"
"com.matty.flink.sql.parser.utils.ParserResource"
]

keywords: [
...
"EXTENDED",
"ESTIMATED_COST",
"CHANGELOG_MODE",
"JSON_EXECUTION_PLAN",
"TABLES"
]

statementParserMethods: [
...
"SqlRichDescribeTable()"
"SqlRichExplain()"
"SqlShowTables()"
]
parserImpls.ftl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133

...
/**
* DESCRIBE | DESC [ EXTENDED] [[catalogName.] dataBasesName].tableName sql call.
* Here we add Rich in className to distinguish from calcite's original SqlDescribeTable.
*/
SqlRichDescribeTable SqlRichDescribeTable() :
{
SqlIdentifier tableName;
SqlParserPos pos;
boolean isExtended = false;
}
{
( <DESCRIBE> | <DESC> ) { pos = getPos();}
[ <EXTENDED> { isExtended = true;} ]
tableName = CompoundIdentifier()
{
return new SqlRichDescribeTable(pos, tableName, isExtended);
}
}

/**
* Parses an explain module statement.
*/
SqlNode SqlRichExplain() :
{
SqlNode stmt;
Set<String> explainDetails = new HashSet<String>();
}
{
(
LOOKAHEAD(3) <EXPLAIN> <PLAN> <FOR>
|
LOOKAHEAD(2) <EXPLAIN> ParseExplainDetail(explainDetails) ( <COMMA> ParseExplainDetail(explainDetails) )*
|
<EXPLAIN>
)
(
stmt = SqlStatementSet()
|
stmt = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
|
stmt = RichSqlInsert()
)
{
return new SqlRichExplain(getPos(), stmt, explainDetails);
}
}

/**
* Parse a statement set.
*
* STATEMENT SET BEGIN (RichSqlInsert();)+ END
*/
SqlNode SqlStatementSet() :
{
SqlParserPos startPos;
SqlNode insert;
List<RichSqlInsert> inserts = new ArrayList<RichSqlInsert>();
}
{
<STATEMENT>{ startPos = getPos(); } <SET> <BEGIN>
(
insert = RichSqlInsert()
<SEMICOLON>
{
inserts.add((RichSqlInsert) insert);
}
)+
<END>
{
return new SqlStatementSet(inserts, startPos);
}
}

void ParseExplainDetail(Set<String> explainDetails):
{
}
{
(
<ESTIMATED_COST>
|
<CHANGELOG_MODE>
|
<JSON_EXECUTION_PLAN>
)
{
if (explainDetails.contains(token.image.toUpperCase())) {
throw SqlUtil.newContextException(
getPos(),
ParserResource.RESOURCE.explainDetailIsDuplicate());
} else {
explainDetails.add(token.image.toUpperCase());
}
}
}

/**
* SHOW TABLES FROM [catalog.] database sql call.
*/
SqlShowTables SqlShowTables() :
{
SqlIdentifier databaseName = null;
SqlCharStringLiteral likeLiteral = null;
String prep = null;
boolean notLike = false;
SqlParserPos pos;
}
{
<SHOW> <TABLES>
{ pos = getPos(); }
[
( <FROM> { prep = "FROM"; } | <IN> { prep = "IN"; } )
{ pos = getPos(); }
databaseName = CompoundIdentifier()
]
[
[
<NOT>
{
notLike = true;
}
]
<LIKE> <QUOTED_STRING>
{
String likeCondition = SqlParserUtil.parseString(token.image);
likeLiteral = SqlLiteral.createCharString(likeCondition, getPos());
}
]
{
return new SqlShowTables(pos, prep, databaseName, notLike, likeLiteral);
}
}

DESCRIBE

SqlRichDescribeTableTest.javagit 地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class SqlRichDescribeTableTest extends BaseParser {

@Test
public void describeTable() {
String sql = "DESC Orders";

final SqlNode sqlNode = parseStmtAndHandleEx(sql);
assert sqlNode instanceof SqlRichDescribeTable;
final SqlRichDescribeTable sqlRichDescribeTable = (SqlRichDescribeTable) sqlNode;

// SqlRichDescribeTable -> DescribeTableOperation
// catalogManager.getTable(ObjectIdentifier)
String[] fullTableName = sqlRichDescribeTable.fullTableName();
LOG.debug("fullTableName: {}", fullTableName);
}
}

EXPLAIN

SqlRichExplainTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SqlRichExplainTest extends BaseParser {

@Test
public void explainForCBO() {
String sql = "EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN \n" +
" SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' \n" +
" UNION ALL \n" +
" SELECT `count`, word FROM MyTable2";

final SqlNode sqlNode = parseStmtAndHandleEx(sql);
assert sqlNode instanceof SqlRichExplain;
final SqlRichExplain sqlRichExplain = (SqlRichExplain) sqlNode;

// ExplainOperation(PlannerQueryOperation)
SqlNode sqlQueryNode = sqlRichExplain.getStatement();
assert sqlQueryNode.getKind().belongsTo(SqlKind.QUERY);

Set<String> explainDetails = sqlRichExplain.getExplainDetails();
LOG.debug("explainDetails: {}", explainDetails);

List<SqlNode> operandList = sqlRichExplain.getOperandList();
LOG.debug("operandList: {}", operandList);
}

}

SHOW

SqlShowTablesTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SqlShowTablesTest extends BaseParser {

@Test
public void showTables() {
String sql = "show tables";

final SqlNode sqlNode = parseStmtAndHandleEx(sql);
assert sqlNode instanceof SqlShowTables;
final SqlShowTables sqlShowTables = (SqlShowTables) sqlNode;

// SqlShowTables -> ShowTablesOperation -> catalogManager.listTables
List<SqlNode> operandList = sqlShowTables.getOperandList();

LOG.debug("operandList: {}", operandList);
}

}