Flink1.11 SQL Demo: 構建一個端到端的流式應用


本文將基於 Kafka, MySQL, Elasticsearch, Kibana,使用 Flink SQL 構建一個電商用戶行為的實時分析應用。本文所有的實戰演練都將在 Flink SQL CLI 上執行,全程只涉及 SQL 純文本,無需一行 Java/Scala 代碼,無需安裝 IDE。本實戰演練的最終效果圖:

 

 

 

准備

一台裝有 Docker 的 Linux 或 MacOS 計算機。

使用 Docker Compose 啟動容器

本實戰演示所依賴的組件全都編排到了容器中,因此可以通過 docker-compose 一鍵啟動。你可以通過 wget 命令自動下載該 docker-compose.yml 文件,也可以手動下載。

mkdir flink-sql-demo; cd flink-sql-demo; wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/v1.11-CN/docker-compose.yml

該 Docker Compose 中包含的容器有:

  • Flink SQL Client: 用於提交 Flink SQL
  • Flink集群: 包含一個 JobManager 和 一個 TaskManager 用於運行 SQL 任務。
  • DataGen: 數據生成器。容器啟動后會自動開始生成用戶行為數據,並發送到 Kafka 集群中。默認每秒生成 2000 條數據,能持續生成一個多小時。也可以更改 docker-compose.yml 中 datagen 的 speedup 參數來調整生成速率(重啟 docker compose 才能生效)。
  • MySQL: 集成了 MySQL 5.7 ,以及預先創建好了類目表(category),預先填入了子類目與頂級類目的映射關系,后續作為維表使用。
  • Kafka: 主要用作數據源。DataGen 組件會自動將數據灌入這個容器中。
  • Zookeeper: Kafka 容器依賴。
  • Elasticsearch: 主要存儲 Flink SQL 產出的數據。
  • Kibana: 可視化 Elasticsearch 中的數據。

在啟動容器前,建議修改 Docker 的配置,將資源調整到 4GB 以及 4核。啟動所有的容器,只需要在 docker-compose.yml 所在目錄下運行如下命令。

docker-compose up -d

該命令會以 detached 模式自動啟動 Docker Compose 配置中定義的所有容器。你可以通過 docker ps 來觀察上述的五個容器是否正常啟動了。 也可以訪問 http://localhost:5601/ 來查看 Kibana 是否運行正常。

另外可以通過如下命令停止所有的容器:

docker-compose down

進入 SQL CLI 客戶端

運行如下命令進入 SQL CLI 客戶端:

docker-compose exec sql-client ./sql-client.sh

該命令會在容器中啟動 SQL CLI 客戶端。你應該能在 CLI 客戶端中看到如下的環境界面。

使用 DDL 創建 Kafka 表

Datagen 容器在啟動后會往 Kafka 的 user_behavior topic 中持續不斷地寫入數據。數據包含了2017年11月27日一天的用戶行為(行為包括點擊、購買、加購、喜歡),每一行表示一條用戶行為,以 JSON 的格式由用戶ID、商品ID、商品類目ID、行為類型和時間組成。該原始數據集來自阿里雲天池公開數據集,特此鳴謝。

我們可以在 docker-compose.yml 所在目錄下運行如下命令,查看 Kafka 集群中生成的前10條數據。

docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'
{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} {"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} ...

有了數據源后,我們就可以用 DDL 去創建並連接這個 Kafka 中的 topic 了。在 Flink SQL CLI 中執行該 DDL。

CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime AS PROCTIME(), -- generates processing-time attribute using computed column WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- defines watermark on ts column, marks ts as event-time attribute ) WITH ( 'connector' = 'kafka', -- using kafka connector 'topic' = 'user_behavior', -- kafka topic 'scan.startup.mode' = 'earliest-offset', -- reading from the beginning 'properties.bootstrap.servers' = 'kafka:9094', -- kafka broker address 'format' = 'json' -- the data format is json );

如上我們按照數據的格式聲明了 5 個字段,除此之外,我們還通過計算列語法和 PROCTIME() 內置函數聲明了一個產生處理時間的虛擬列。我們還通過 WATERMARK 語法,在 ts 字段上聲明了 watermark 策略(容忍5秒亂序), ts 字段因此也成了事件時間列。關於時間屬性以及 DDL 語法可以閱讀官方文檔了解更多:

在 SQL CLI 中成功創建 Kafka 表后,可以通過 show tables; 和 describe user_behavior; 來查看目前已注冊的表,以及表的詳細信息。我們也可以直接在 SQL CLI 中運行 SELECT * FROM user_behavior; 預覽下數據(按q退出)。

接下來,我們會通過三個實戰場景來更深入地了解 Flink SQL 。

統計每小時的成交量

使用 DDL 創建 Elasticsearch 表

我們先在 SQL CLI 中創建一個 ES 結果表,根據場景需求主要需要保存兩個數據:小時、成交量。

CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector' = 'elasticsearch-7', -- using elasticsearch connector 'hosts' = 'http://elasticsearch:9200', -- elasticsearch address 'index' = 'buy_cnt_per_hour' -- elasticsearch index name, similar to database table name );

我們不需要在 Elasticsearch 中事先創建 buy_cnt_per_hour 索引,Flink Job 會自動創建該索引。

提交 Query

統計每小時的成交量就是每小時共有多少 "buy" 的用戶行為。因此會需要用到 TUMBLE 窗口函數,按照一小時切窗。然后每個窗口分別統計 "buy" 的個數,這可以通過先過濾出 "buy" 的數據,然后 COUNT(*) 實現。
INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*) FROM user_behavior WHERE behavior = 'buy' GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

這里我們使用 HOUR 內置函數,從一個 TIMESTAMP 列中提取出一天中第幾個小時的值。使用了 INSERT INTO將 query 的結果持續不斷地插入到上文定義的 es 結果表中
(可以將 es 結果表理解成 query 的物化視圖)。
另外可以閱讀該文檔了解更多關於窗口聚合的內容:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#group-windows

 在 Flink SQL CLI 中運行上述查詢后,在 Flink Web UI 中就能看到提交的任務,該任務是一個流式任務,因此會一直運行。

 

可以看到凌晨是一天中成交量的低谷。

使用 Kibana 可視化結果

我們已經通過 Docker Compose 啟動了 Kibana 容器,可以通過 http://localhost:5601 訪問 Kibana。首先我們需要先配置一個 index pattern。點擊左側工具欄的 "Management",就能找到 "Index Patterns"。點擊 "Create Index Pattern",然后通過輸入完整的索引名 "buy_cnt_per_hour" 創建 index pattern。創建完成后, Kibana 就知道了我們的索引,我們就可以開始探索數據了。

先點擊左側工具欄的"Discovery"按鈕,Kibana 就會列出剛剛創建的索引中的內容。

 

 

 

 接下來,我們先創建一個 Dashboard 用來展示各個可視化的視圖。點擊頁面左側的"Dashboard",創建一個名為 ”用戶行為日志分析“ 的Dashboard。然后點擊 "Create New" 創建一個新的視圖,選擇 "Area" 面積圖,選擇 "buy_cnt_per_hour" 索引,按照如下截圖中的配置(左側)畫出成交量面積圖,並保存為”每小時成交量“。

 

 

 

統計一天每10分鍾累計獨立用戶數

另一個有意思的可視化是統計一天中每一刻的累計獨立用戶數(uv),也就是每一刻的 uv 數都代表從0點到當前時刻為止的總計 uv 數,因此該曲線肯定是單調遞增的。

我們仍然先在 SQL CLI 中創建一個 Elasticsearch 表,用於存儲結果匯總數據。主要字段有:日期時間和累積 uv 數。我們將日期時間作為 Elasticsearch 中的 document id,便於更新該日期時間的 uv 值。

CREATE TABLE cumulative_uv ( date_str STRING, time_str STRING, uv BIGINT, PRIMARY KEY (date_str, time_str) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://elasticsearch:9200', 'index' = 'cumulative_uv' );

為了實現該曲線,我們先抽取出日期和時間字段,我們使用 DATE_FORMAT 抽取出基本的日期與時間,再用 SUBSTR 和 字符串連接函數 || 將時間修正到10分鍾級別,如: 12:1012:20。其次,我們在外層查詢上基於日期分組,求當前最大的時間,和 UV,寫入到 Elasticsearch 的索引中。UV 的統計我們通過內置的 COUNT(DISTINCT user_id)來完成,Flink SQL 內部對 COUNT DISTINCT 做了非常多的優化,因此可以放心使用。

這里之所以需要求最大的時間,同時又按日期+時間作為主鍵寫入到 Elasticsearch,是因為我們在計算累積 UV 數。

INSERT INTO cumulative_uv
SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uv FROM ( SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str, user_id FROM user_behavior) GROUP BY date_str;

提交上述查詢后,在 Kibana 中創建 cumulative_uv 的 index pattern,然后在 Dashboard 中創建一個"Line"折線圖,選擇 cumulative_uv 索引,按照如下截圖中的配置(左側)畫出累計獨立用戶數曲線,並保存。

 

 

 

 

頂級類目排行榜

最后一個有意思的可視化是類目排行榜,從而了解哪些類目是支柱類目。不過由於源數據中的類目分類太細(約5000個類目),對於排行榜意義不大,因此我們希望能將其歸約到頂級類目。所以筆者在 mysql 容器中預先准備了子類目與頂級類目的映射數據,用作維表。

在 SQL CLI 中創建 MySQL 表,后續用作維表查詢。

CREATE TABLE category_dim ( sub_category_id BIGINT, parent_category_name STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysql:3306/flink', 'table-name' = 'category', 'username' = 'root', 'password' = '123456', 'lookup.cache.max-rows' = '5000', 'lookup.cache.ttl' = '10min' );

同時我們再創建一個 Elasticsearch 表,用於存儲類目統計結果。

CREATE TABLE top_category ( category_name STRING PRIMARY KEY NOT ENFORCED, buy_cnt BIGINT ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://elasticsearch:9200', 'index' = 'top_category' );


第一步我們通過維表關聯,補全類目名稱。我們仍然使用 CREATE VIEW 將該查詢注冊成一個視圖,簡化邏輯。維表關聯使用 temporal join 語法,
可以查看文檔了解更多:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table

CREATE VIEW rich_user_behavior AS SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.sub_category_id;


最后根據 類目名稱分組,統計出 buy 的事件數,並寫入 Elasticsearch 中。
INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt FROM rich_user_behavior WHERE behavior = 'buy' GROUP BY category_name;



提交上述查詢后,在 Kibana 中創建 top_category 的 index pattern,然后在 Dashboard 中創建一個"Horizontal Bar"條形圖,
選擇 top_category 索引,按照如下截圖中的配置(左側)畫出類目排行榜,並保存。

 

 

 
        

可以看到服飾鞋包的成交量遠遠領先其他類目。

Kibana 還提供了非常豐富的圖形和可視化選項,感興趣的用戶可以用 Flink SQL 對數據進行更多維度的分析,並使用 Kibana 展示出可視化圖,並觀測圖形數據的實時變化。

 

結尾

在本文中,我們展示了如何使用 Flink SQL 集成 Kafka, MySQL, Elasticsearch 以及 Kibana 來快速搭建一個實時分析應用。整個過程無需一行 Java/Scala 代碼,使用 SQL 純文本即可完成。期望通過本文,可以讓讀者了解到 Flink SQL 的易用和強大,包括輕松連接各種外部系統、對事件時間和亂序數據處理的原生支持、維表關聯、豐富的內置函數等等。希望你能喜歡我們的實戰演練,並從中獲得樂趣和知識!

 

本文章轉自:https://github.com/wuchong/flink-sql-demo#%E5%87%86%E5%A4%87

 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM