前言
FlinkSQL 和常見的SQL一樣,也分為 DDL,DML,DQL,DCL。
本文的主要內容是探討如何利用FlinkAPI 對多行SQL語句進行校驗。
SQL語言共分為四大類:數據查詢語言DQL,數據操縱語言DML,數據定義語言DDL,數據控制語言DCL。
以下是幾個例子
DDL | DML | DQL | DCL |
---|---|---|---|
CREATE TABLE... | INSERT/UPDATE/DELETE... | SELECT | GRANT ROLLBACK/COMMNIT |
校驗
校驗可以利用Calcite 結合Flink的一些API來進行校驗。
Parser
org.apache.flink.table.delegation.Parser
public interface Parser {
/**
* Entry point for parsing SQL queries expressed as a String.
*
* <p><b>Note:</b>If the created {@link Operation} is a {@link QueryOperation} it must be in a
* form that will be understood by the {@link Planner#translate(List)} method.
*
* <p>The produced Operation trees should already be validated.
*
* @param statement the SQL statement to evaluate
* @return parsed queries as trees of relational {@link Operation}s
* @throws org.apache.flink.table.api.SqlParserException when failed to parse the statement
*/
List<Operation> parse(String statement);
/**
* Entry point for parsing SQL identifiers expressed as a String.
*
* @param identifier the SQL identifier to parse
* @return parsed identifier
* @throws org.apache.flink.table.api.SqlParserException when failed to parse the identifier
*/
UnresolvedIdentifier parseIdentifier(String identifier);
/**
* Entry point for parsing SQL expressions expressed as a String.
*
* @param sqlExpression the SQL expression to parse
* @param inputSchema the schema of the fields in sql expression
* @return resolved expression
* @throws org.apache.flink.table.api.SqlParserException when failed to parse the sql expression
*/
ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema);
}
org.apache.flink.table.planner.delegation.ParserImpl
如何獲取這個Parser?
//1. 先創建ExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tableEnv = StreamTableEnvironment.create(env, settings);
//tableEnv.registerCatalog...
//2. 從這個TableEnv中獲取ParserImpl
Parser parserImpl = ((TableEnvironmentImpl) tableEnv).getParser();
思考
有了這個Parser就可以校驗單條SQL了。那么如果有多條語句怎么校驗?
多行SQL校驗
有兩種思路
- 用換行符號分割,一般
;\n
作為一條SQL。 - 用Calcite的API進行分割。
這里只講述Calcite的方式
用Calcite進行多條語句分割
SqlParser.Config sqlParserConfig = ...
SqlParser calciteSqlParser = SqlParser.create(sql, sqlParserConfig);
SqlNodeList sqlNodes = calciteSqlParser.parseStmtList();
這里就獲得了SqlNodes,每一個SqlNode就是一行語句。
如何對SqlNode進行校驗
這里就有一個問題:
org.apache.flink.table.delegation.Parser
的方法都是針對SQL 字符串的,形參不是SqlNode。
//ParserImpl的方法
@Override
public List<Operation> parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// parse the sql query
SqlNode parsed = parser.parse(statement);
Operation operation =
SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
}
可以看到這里也是用的calciteParser把Statement轉換成SqlNode。我們已經轉換成SqlNode了,也就是直接使用
SqlToOperationConverter.convert(planner, catalogManager, parsed)
即可。
Planner和catalogManager,如何獲取
這里的Planner是FlinkPlannerImpl
,只能是這個,他沒有接口。
前面提到的tableEnv
實際上就是TableEnvironmentImpl
類的實例了。
可以通過TableEnvironmentImpl
類來獲取CatalogManager
和Planner
這個就是StreamPlanner
了,他是Planner
接口的實現類。
利用這個StreamPlanner
可以創建一個FlinkPlannerImpl
實例。
CatalogManager catalogManager = ((TableEnvironmentImpl) tableEnv).getCatalogManager();
StreamPlanner planner = (StreamPlanner) ((TableEnvironmentImpl) tableEnv).getPlanner();
//創建實例
FlinkPlannerImpl flinkPlanner = planner.createFlinkPlanner();
最終校驗的表達式
SqlNode node = ...
Optional<Operation> operationOptional = SqlToOperationConverter.convert(planner, catalogManager, node)
通過InstanceOf 可以判斷Operation具體的類別,看看是對應DML,DQL,DDL的哪一種。