Flink學習(八) Flink SQL & Table 編程和案例


Flink Table & SQL 概述
背景
我們在前面的課時中講過 Flink 的分層模型,Flink 自身提供了不同級別的抽象來支持我們開發流式或者批量處理程序,下圖描述了 Flink 支持的 4 種不同級別的抽象。

 

Table API 和 SQL 處於最頂端,是 Flink 提供的高級 API 操作。Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設計的一套符合標准 SQL 語義的開發語言。

Flink 在編程模型上提供了 DataStream 和 DataSet 兩套 API,並沒有做到事實上的批流統一,因為用戶和開發者還是開發了兩套代碼。正是因為 Flink Table & SQL 的加入,可以說 Flink 在某種程度上做到了事實上的批流一體。

原理
你之前可能都了解過 Hive,在離線計算場景下 Hive 幾乎扛起了離線數據處理的半壁江山。它的底層對 SQL 的解析用到了 Apache Calcite,Flink 同樣把 SQL 的解析、優化和執行教給了 Calcite。

下圖是一張經典的 Flink Table & SQL 實現原理圖,可以看到 Calcite 在整個架構中處於絕對核心地位。

 

 

 

從圖中可以看到無論是批查詢 SQL 還是流式查詢 SQL,都會經過對應的轉換器 Parser 轉換成為節點樹 SQLNode tree,然后生成邏輯執行計划 Logical Plan,邏輯執行計划在經過優化后生成真正可以執行的物理執行計划,交給 DataSet 或者 DataStream 的 API 去執行。

在這里不對 Calcite 的原理過度展開,有興趣的可以直接在官網上學習。

一個完整的 Flink Table & SQL Job 也是由 Source、Transformation、Sink 構成:

 

 

 

Source 部分來源於外部數據源,我們經常用的有 Kafka、MySQL 等;
Transformation 部分則是 Flink Table & SQL 支持的常用 SQL 算子,比如簡單的 Select、Groupby 等,當然在這里也有更為復雜的多流 Join、流與維表的 Join 等;
Sink 部分是指的結果存儲比如 MySQL、HBase 或 Kakfa 等。


動態表
與傳統的表 SQL 查詢相比,Flink Table & SQL 在處理流數據時會時時刻刻處於動態的數據變化中,所以便有了一個動態表的概念。

動態表的查詢與靜態表一樣,但是,在查詢動態表的時候,SQL 會做連續查詢,不會終止。

我們舉個簡單的例子,Flink 程序接受一個 Kafka 流作為輸入,Kafka 中為用戶的購買記錄:

 

 

 

首先,Kafka 的消息會被源源不斷的解析成一張不斷增長的動態表,我們在動態表上執行的 SQL 會不斷生成新的動態表作為結果表。

 

Flink Table & SQL 算子和內置函數
我們在講解 Flink Table & SQL 所支持的常用算子前,需要說明一點,Flink 自從 0.9 版本開始支持 Table & SQL 功能一直處於完善開發中,且在不斷進行迭代。

我們在官網中也可以看到這樣的提示:

Please note that the Table API and SQL are not yet feature complete and are being actively developed. Not all operations are supported by every combination of [Table API, SQL] and [stream, batch] input.

Flink Table & SQL 的開發一直在進行中,並沒有支持所有場景下的計算邏輯。從我個人實踐角度來講,在使用原生的 Flink Table & SQL 時,務必查詢官網當前版本對 Table & SQL 的支持程度,盡量選擇場景明確,邏輯不是極其復雜的場景。

常用算子
目前 Flink SQL 支持的語法主要如下:

query: values
  | { select
      | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY] orderItem: expression [ ASC | DESC ]

select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ]
  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ] selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } projectItem: expression [ [ AS ] columnAlias ] | tableAlias . * tableExpression: tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ] joinCondition: ON booleanExpression | USING '(' column [, column ]* ')' tableReference: tablePrimary [ matchRecognize ]
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ] tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | UNNEST '(' expression ')'

values: VALUES expression [, expression ]* groupItem: expression | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')' windowRef: windowName | windowSpec windowSpec: [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ]
    ')' ...

可以看到 Flink SQL 和傳統的 SQL 一樣,支持了包含查詢、連接、聚合等場景,另外還支持了包括窗口、排序等場景。下面我就以最常用的算子來做詳細的講解。

SELECT/AS/WHERE

SELECT、WHERE 和傳統 SQL 用法一樣,用於篩選和過濾數據,同時適用於 DataStream 和 DataSet。

SELECT * FROM Table; SELECT name,age FROM Table;

GROUP BY / DISTINCT/HAVING

GROUP BY 用於進行分組操作,DISTINCT 用於結果去重。
HAVING 和傳統 SQL 一樣,可以用來在聚合函數之后進行篩選。

SELECT DISTINCT name FROM Table; SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name; SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name HAVING
SUM(score) > 300;

JOIN

JOIN 可以用於把來自兩個表的數據聯合起來形成結果表,目前 Flink 的 Join 只支持等值連接。Flink 支持的 JOIN 類型包括:

JOIN - INNER JOIN
LEFT JOIN - LEFT OUTER JOIN
RIGHT JOIN - RIGHT OUTER JOIN
FULL JOIN - FULL OUTER JOIN

例如,用用戶表和商品表進行關聯:

SELECT *
FROM User LEFT JOIN Product ON User.name = Product.buyer SELECT *
FROM User RIGHT JOIN Product ON User.name = Product.buyer SELECT *
FROM User FULL OUTER JOIN Product ON User.name = Product.buyer

LEFT JOIN、RIGHT JOIN 、FULL JOIN 相與我們傳統 SQL 中含義一樣。

WINDOW

根據窗口數據划分的不同,目前 Apache Flink 有如下 3 種:

滾動窗口,窗口數據有固定的大小,窗口中的數據不會疊加;
滑動窗口,窗口數據有固定大小,並且有生成間隔;
會話窗口,窗口數據沒有固定的大小,根據用戶傳入的參數進行划分,窗口數據無疊加;


滾動窗口

滾動窗口的特點是:有固定大小、窗口中的數據不會重疊,如下圖所示:

 

 

 滾動窗口的語法:

SELECT 
    [gk], [TUMBLE_START(timeCol, size)], [TUMBLE_END(timeCol, size)], agg1(col1), ... aggn(colN) FROM Tab1 GROUP BY [gk], TUMBLE(timeCol, size)

舉例說明,我們需要計算每個用戶每天的訂單數量:

SELECT user, TUMBLE_START(timeLine, INTERVAL '1' DAY) as winStart, SUM(amount) FROM Orders GROUP BY TUMBLE(timeLine, INTERVAL '1' DAY), user;

其中,TUMBLE_START 和 TUMBLE_END 代表窗口的開始時間和窗口的結束時間,TUMBLE (timeLine, INTERVAL '1' DAY) 中的 timeLine 代表時間字段所在的列,INTERVAL '1' DAY 表示時間間隔為一天。

 

滑動窗口

滑動窗口有固定的大小,與滾動窗口不同的是滑動窗口可以通過 slide 參數控制滑動窗口的創建頻率。需要注意的是,多個滑動窗口可能會發生數據重疊,具體語義如下:

 

 

 滑動窗口的語法與滾動窗口相比,只多了一個 slide 參數:

SELECT 
    [gk], [HOP_START(timeCol, slide, size)] , [HOP_END(timeCol, slide, size)], agg1(col1), ... aggN(colN) FROM Tab1 GROUP BY [gk], HOP(timeCol, slide, size)

例如,我們要每間隔一小時計算一次過去 24 小時內每個商品的銷量:

SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product

上述案例中的 INTERVAL '1' HOUR 代表滑動窗口生成的時間間隔。

 

會話窗口

會話窗口定義了一個非活動時間,假如在指定的時間間隔內沒有出現事件或消息,則會話窗口關閉。

 

 會話窗口的語法如下:

SELECT 
    [gk], SESSION_START(timeCol, gap) AS winStart, SESSION_END(timeCol, gap) AS winEnd, agg1(col1), ... aggn(colN) FROM Tab1 GROUP BY [gk], SESSION(timeCol, gap)

舉例,我們需要計算每個用戶過去 1 小時內的訂單量:

SELECT user, SESSION_START(rowtime, INTERVAL '1' HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL '1' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL '1' HOUR), user

內置函數
Flink 中還有大量的內置函數,我們可以直接使用,將內置函數分類如下:

比較函數

邏輯函數

算術函數

字符串處理函數

時間函數

 

Flink Table & SQL 案例
上面分別介紹了 Flink Table & SQL 的原理和支持的算子,我們模擬一個實時的數據流,然后講解 SQL JOIN 的用法。

之前利用 Flink 提供的自定義 Source 功能來實現一個自定義的實時數據源,具體實現如下:

package wyh.tableApi;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import wyh.datastreamingApi.Item;

import java.util.ArrayList;
import java.util.Random;

class MyStreamingSourceTable implements SourceFunction<Item> {

    private boolean isRunning = true;

    /**
     * 重寫run方法產生一個源源不斷的數據發送源
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<Item> ctx) throws Exception {
        while(isRunning){
            Item item = generateItem();
            ctx.collect(item);

            //每秒產生一條數據
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    //隨機產生一條商品數據
    private Item generateItem(){
        int i = new Random().nextInt(1000);
        ArrayList<String> list = new ArrayList<>();
        list.add("HAT");
        list.add("TIE");
        list.add("SHOE");

        Item item = new Item();
        item.setName(list.get(new Random().nextInt(3)));
        item.setId(i);


        return item;
    }


}

我們把實時的商品數據流進行分流,分成 even 和 odd 兩個流進行 JOIN,條件是名稱相同,最后,把兩個流的 JOIN 結果輸出。

package wyh.tableApi;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import wyh.datastreamingApi.Item;

import java.util.ArrayList;

public class StreamingDemoTable {
    public static void main(String[] args) throws Exception{
        //BlinkPlanner 對SQl進行了一些優化 比如去重,取TopN
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //流處理環境
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        //支持table sql環境
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

        SingleOutputStreamOperator<Item> source = bsEnv.addSource(new MyStreamingSourceTable()).map(new MapFunction<Item, Item>() {
            @Override
            public Item map(Item item) throws Exception {
                return item;
            }
        });

        DataStream<Item> splitAll = source.split(new OutputSelector<Item>() {
            @Override
            public Iterable<String> select(Item item) {
                //將實時商品流分成even和odd兩個流
                ArrayList<String> output = new ArrayList<>();

                if (item.getId() % 2 == 0) {
                    output.add("even");
                } else {
                    output.add("odd");
                }
                return output;
            }
        });

        //將兩個流篩選出來
        DataStream<Item> evenSelect = ((SplitStream<Item>) splitAll).select("even");
        DataStream<Item> oddSelect = ((SplitStream<Item>) splitAll).select("odd");

        //把這兩個流在我們的Flink環境中注冊為臨時表
        bsTableEnv.createTemporaryView("evenTable",evenSelect,"name,id");
        bsTableEnv.createTemporaryView("oddTable",oddSelect,"name,id");

        Table quertTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name=b.name");

        quertTable.printSchema();

        bsTableEnv.toRetractStream(quertTable,TypeInformation.of(new TypeHint<Tuple4<Integer,String,Integer,String>>() {})).print();


        bsEnv.execute("streaming sql job");
    }
}

直接右鍵運行,在控制台可以看到輸出:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM