【Flink系列七】構建實時計算平台——校驗FlinkSQL


前言

FlinkSQL 和常見的SQL一樣,也分為 DDL,DML,DQL,DCL。

本文的主要內容是探討如何利用FlinkAPI 對多行SQL語句進行校驗。

SQL語言共分為四大類:數據查詢語言DQL,數據操縱語言DML,數據定義語言DDL,數據控制語言DCL。

以下是幾個例子

DDL DML DQL DCL
CREATE TABLE... INSERT/UPDATE/DELETE... SELECT GRANT ROLLBACK/COMMNIT

校驗

校驗可以利用Calcite 結合Flink的一些API來進行校驗。

Parser

org.apache.flink.table.delegation.Parser

public interface Parser {

    /**
     * Entry point for parsing SQL queries expressed as a String.
     *
     * <p><b>Note:</b>If the created {@link Operation} is a {@link QueryOperation} it must be in a
     * form that will be understood by the {@link Planner#translate(List)} method.
     *
     * <p>The produced Operation trees should already be validated.
     *
     * @param statement the SQL statement to evaluate
     * @return parsed queries as trees of relational {@link Operation}s
     * @throws org.apache.flink.table.api.SqlParserException when failed to parse the statement
     */
    List<Operation> parse(String statement);

    /**
     * Entry point for parsing SQL identifiers expressed as a String.
     *
     * @param identifier the SQL identifier to parse
     * @return parsed identifier
     * @throws org.apache.flink.table.api.SqlParserException when failed to parse the identifier
     */
    UnresolvedIdentifier parseIdentifier(String identifier);

    /**
     * Entry point for parsing SQL expressions expressed as a String.
     *
     * @param sqlExpression the SQL expression to parse
     * @param inputSchema the schema of the fields in sql expression
     * @return resolved expression
     * @throws org.apache.flink.table.api.SqlParserException when failed to parse the sql expression
     */
    ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema);
}


org.apache.flink.table.planner.delegation.ParserImpl

如何獲取這個Parser?

//1. 先創建ExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance()
	.useBlinkPlanner()
	.inStreamingMode()
	.build();

tableEnv = StreamTableEnvironment.create(env, settings);
//tableEnv.registerCatalog...
//2. 從這個TableEnv中獲取ParserImpl
Parser parserImpl = ((TableEnvironmentImpl) tableEnv).getParser();

思考

有了這個Parser就可以校驗單條SQL了。那么如果有多條語句怎么校驗?


多行SQL校驗

有兩種思路

  1. 用換行符號分割,一般;\n作為一條SQL。
  2. 用Calcite的API進行分割。
    這里只講述Calcite的方式

用Calcite進行多條語句分割

SqlParser.Config sqlParserConfig = ...
SqlParser calciteSqlParser = SqlParser.create(sql, sqlParserConfig);
SqlNodeList sqlNodes = calciteSqlParser.parseStmtList();

這里就獲得了SqlNodes,每一個SqlNode就是一行語句。

如何對SqlNode進行校驗

這里就有一個問題:
org.apache.flink.table.delegation.Parser
的方法都是針對SQL 字符串的,形參不是SqlNode。

//ParserImpl的方法

    @Override
    public List<Operation> parse(String statement) {
        CalciteParser parser = calciteParserSupplier.get();
        FlinkPlannerImpl planner = validatorSupplier.get();
        // parse the sql query
        SqlNode parsed = parser.parse(statement);

        Operation operation =
                SqlToOperationConverter.convert(planner, catalogManager, parsed)
                        .orElseThrow(() -> new TableException("Unsupported query: " + statement));
        return Collections.singletonList(operation);
    }

可以看到這里也是用的calciteParser把Statement轉換成SqlNode。我們已經轉換成SqlNode了,也就是直接使用

SqlToOperationConverter.convert(planner, catalogManager, parsed)

即可。

Planner和catalogManager,如何獲取

這里的Planner是FlinkPlannerImpl,只能是這個,他沒有接口

前面提到的tableEnv實際上就是TableEnvironmentImpl類的實例了。

可以通過TableEnvironmentImpl類來獲取CatalogManagerPlanner
這個就是StreamPlanner了,他是Planner接口的實現類。

利用這個StreamPlanner可以創建一個FlinkPlannerImpl實例。

CatalogManager catalogManager = ((TableEnvironmentImpl) tableEnv).getCatalogManager();
StreamPlanner planner = (StreamPlanner) ((TableEnvironmentImpl) tableEnv).getPlanner();

//創建實例
FlinkPlannerImpl flinkPlanner = planner.createFlinkPlanner();

最終校驗的表達式

SqlNode node = ...
Optional<Operation> operationOptional = SqlToOperationConverter.convert(planner, catalogManager, node)

通過InstanceOf 可以判斷Operation具體的類別,看看是對應DML,DQL,DDL的哪一種。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM