Flink 1.10.0 於近期剛發布,釋放了許多令人激動的新特性。尤其是 Flink SQL 模塊,發展速度非常快,因此本文特意從實踐的角度出發,帶領大家一起探索使用 Flink SQL 如何快速構建流式應用。
本文將基於 Kafka, MySQL, Elasticsearch, Kibana,使用 Flink SQL 構建一個電商用戶行為的實時分析應用。本文所有的實戰演練都將在 Flink SQL CLI 上執行,全程只涉及 SQL 純文本,無需一行 Java/Scala 代碼,無需安裝 IDE。本實戰演練的最終效果圖:
准備
一台裝有 Docker 和 Java8 的 Linux 或 MacOS 計算機。
使用 Docker Compose 啟動容器
本實戰演示所依賴的組件全都編排到了容器中,因此可以通過 docker-compose
一鍵啟動。你可以通過 wget
命令自動下載該 docker-compose.yml
文件,也可以手動下載。
該 Docker Compose 中包含的容器有:
- DataGen: 數據生成器。容器啟動后會自動開始生成用戶行為數據,並發送到 Kafka 集群中。默認每秒生成 1000 條數據,持續生成約 3 小時。也可以更改
docker-compose.yml
中 datagen 的speedup
參數來調整生成速率(重啟 docker compose 才能生效)。 - MySQL: 集成了 MySQL 5.7 ,以及預先創建好了類目表(
category
),預先填入了子類目與頂級類目的映射關系,后續作為維表使用。 - Kafka: 主要用作數據源。DataGen 組件會自動將數據灌入這個容器中。
- Zookeeper: Kafka 容器依賴。
- Elasticsearch: 主要存儲 Flink SQL 產出的數據。
- Kibana: 可視化 Elasticsearch 中的數據。
在啟動容器前,建議修改 Docker 的配置,將資源調整到 4GB 以及 4核。啟動所有的容器,只需要在 docker-compose.yml
所在目錄下運行如下命令。
該命令會以 detached 模式自動啟動 Docker Compose 配置中定義的所有容器。你可以通過 docker ps
來觀察上述的五個容器是否正常啟動了。 也可以訪問 http://localhost:5601/ 來查看 Kibana 是否運行正常。
另外可以通過如下命令停止所有的容器:
下載安裝 Flink 本地集群
我們推薦用戶手動下載安裝 Flink,而不是通過 Docker 自動啟動 Flink。因為這樣可以更直觀地理解 Flink 的各個組件、依賴、和腳本。
- 下載 Flink 1.10.0 安裝包並解壓(解壓目錄
flink-1.10.0
):https://www.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz - 進入 flink-1.10.0 目錄:
cd flink-1.10.0
-
通過如下命令下載依賴 jar 包,並拷貝到
lib/
目錄下,也可手動下載和拷貝。因為我們運行時需要依賴各個 connector 實現。 -
-P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \
wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \
wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.0/flink-sql-connector-elasticsearch7_2.11-1.10.0.jar | \
wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \
wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar - 將
conf/flink-conf.yaml
中的taskmanager.numberOfTaskSlots
修改成 10,因為我們會同時運行多個任務。 - 執行
./bin/start-cluster.sh
,啟動集群。
運行成功的話,可以在 http://localhost:8081 訪問到 Flink Web UI。並且可以看到可用 Slots 數為 10 個。
- 執行
bin/sql-client.sh embedded
啟動 SQL CLI。便會看到如下的松鼠歡迎界面。
使用 DDL 創建 Kafka 表
Datagen 容器在啟動后會往 Kafka 的 user_behavior
topic 中持續不斷地寫入數據。數據包含了2017年11月27日一天的用戶行為(行為包括點擊、購買、加購、喜歡),每一行表示一條用戶行為,以 JSON 的格式由用戶ID、商品ID、商品類目ID、行為類型和時間組成。該原始數據集來自阿里雲天池公開數據集,特此鳴謝。
我們可以在 docker-compose.yml
所在目錄下運行如下命令,查看 Kafka 集群中生成的前10條數據。
有了數據源后,我們就可以用 DDL 去創建並連接這個 Kafka 中的 topic 了。在 Flink SQL CLI 中執行該 DDL。
如上我們按照數據的格式聲明了 5 個字段,除此之外,我們還通過計算列語法和 PROCTIME()
內置函數聲明了一個產生處理時間的虛擬列。我們還通過 WATERMARK 語法,在 ts 字段上聲明了 watermark 策略(容忍5秒亂序), ts 字段因此也成了事件時間列。關於時間屬性以及 DDL 語法可以閱讀官方文檔了解更多:
- 時間屬性:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/time_attributes.html
- DDL:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
在 SQL CLI 中成功創建 Kafka 表后,可以通過 show tables;
和 describe user_behavior;
來查看目前已注冊的表,以及表的詳細信息。我們也可以直接在 SQL CLI 中運行 SELECT * FROM user_behavior;
預覽下數據(按q
退出)。
接下來,我們會通過三個實戰場景來更深入地了解 Flink SQL 。
統計每小時的成交量
使用 DDL 創建 Elasticsearch 表
我們先在 SQL CLI 中創建一個 ES 結果表,根據場景需求主要需要保存兩個數據:小時、成交量。
我們不需要在 Elasticsearch 中事先創建 buy_cnt_per_hour
索引,Flink Job 會自動創建該索引。
提交 Query
統計每小時的成交量就是每小時共有多少 "buy" 的用戶行為。因此會需要用到 TUMBLE 窗口函數,按照一小時切窗。然后每個窗口分別統計 "buy" 的個數,這可以通過先過濾出 "buy" 的數據,然后 COUNT(*)
實現。
這里我們使用 HOUR
內置函數,從一個 TIMESTAMP 列中提取出一天中第幾個小時的值。使用了 INSERT INTO
將 query 的結果持續不斷地插入到上文定義的 es 結果表中(可以將 es 結果表理解成 query 的物化視圖)。另外可以閱讀該文檔了解更多關於窗口聚合的內容:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows
在 Flink SQL CLI 中運行上述查詢后,在 Flink Web UI 中就能看到提交的任務,該任務是一個流式任務,因此會一直運行。
使用 Kibana 可視化結果
我們已經通過 Docker Compose 啟動了 Kibana 容器,可以通過 http://localhost:5601 訪問 Kibana。首先我們需要先配置一個 index pattern。點擊左側工具欄的 "Management",就能找到 "Index Patterns"。點擊 "Create Index Pattern",然后通過輸入完整的索引名 "buy_cnt_per_hour" 創建 index pattern。創建完成后, Kibana 就知道了我們的索引,我們就可以開始探索數據了。
先點擊左側工具欄的"Discovery"按鈕,Kibana 就會列出剛剛創建的索引中的內容。
接下來,我們先創建一個 Dashboard 用來展示各個可視化的視圖。點擊頁面左側的"Dashboard",創建一個名為 ”用戶行為日志分析“ 的Dashboard。然后點擊 "Create New" 創建一個新的視圖,選擇 "Area" 面積圖,選擇 "buy_cnt_per_hour" 索引,按照如下截圖中的配置(左側)畫出成交量面積圖,並保存為”每小時成交量“。
可以看到凌晨是一天中成交量的低谷。
統計一天每10分鍾累計獨立用戶數
另一個有意思的可視化是統計一天中每一刻的累計獨立用戶數(uv),也就是每一刻的 uv 數都代表從0點到當前時刻為止的總計 uv 數,因此該曲線肯定是單調遞增的。
我們仍然先在 SQL CLI 中創建一個 Elasticsearch 表,用於存儲結果匯總數據。主要有兩個字段:時間和累積 uv 數。
為了實現該曲線,我們可以先通過 OVER WINDOW 計算出每條數據的當前分鍾,以及當前累計 uv(從0點開始到當前行為止的獨立用戶數)。 uv 的統計我們通過內置的 COUNT(DISTINCT user_id)
來完成,Flink SQL 內部對 COUNT DISTINCT 做了非常多的優化,因此可以放心使用。
這里我們使用 SUBSTR
和 DATE_FORMAT
還有 ||
內置函數,將一個 TIMESTAMP 字段轉換成了 10分鍾單位的時間字符串,如: 12:10
, 12:20
。關於 OVER WINDOW 的更多內容可以參考文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#aggregations
我們還使用了 CREATE VIEW 語法將 query 注冊成了一個邏輯視圖,可以方便地在后續查詢中對該 query 進行引用,這有利於拆解復雜 query。注意,創建邏輯視圖不會觸發作業的執行,視圖的結果也不會落地,因此使用起來非常輕量,沒有額外開銷。由於 uv_per_10min
每條輸入數據都產生一條輸出數據,因此對於存儲壓力較大。我們可以基於 uv_per_10min
再根據分鍾時間進行一次聚合,這樣每10分鍾只有一個點會存儲在 Elasticsearch 中,對於 Elasticsearch 和 Kibana 可視化渲染的壓力會小很多。
提交上述查詢后,在 Kibana 中創建 cumulative_uv
的 index pattern,然后在 Dashboard 中創建一個"Line"折線圖,選擇 cumulative_uv
索引,按照如下截圖中的配置(左側)畫出累計獨立用戶數曲線,並保存。
頂級類目排行榜
最后一個有意思的可視化是類目排行榜,從而了解哪些類目是支柱類目。不過由於源數據中的類目分類太細(約5000個類目),對於排行榜意義不大,因此我們希望能將其歸約到頂級類目。所以筆者在 mysql 容器中預先准備了子類目與頂級類目的映射數據,用作維表。
在 SQL CLI 中創建 MySQL 表,后續用作維表查詢。
同時我們再創建一個 Elasticsearch 表,用於存儲類目統計結果。
第一步我們通過維表關聯,補全類目名稱。我們仍然使用 CREATE VIEW 將該查詢注冊成一個視圖,簡化邏輯。維表關聯使用 temporal join 語法,可以查看文檔了解更多:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
最后根據 類目名稱分組,統計出 buy
的事件數,並寫入 Elasticsearch 中。
提交上述查詢后,在 Kibana 中創建 top_category
的 index pattern,然后在 Dashboard 中創建一個"Horizontal Bar"條形圖,選擇 top_category
索引,按照如下截圖中的配置(左側)畫出類目排行榜,並保存。
可以看到服飾鞋包的成交量遠遠領先其他類目。
到目前為止,我們已經完成了三個實戰案例及其可視化視圖。現在可以回到 Dashboard 頁面,對各個視圖進行拖拽編排,讓我們的 Dashboard 看上去更加正式、直觀(如本文開篇效果圖)。當然,Kibana 還提供了非常豐富的圖形和可視化選項,而用戶行為數據中也有很多有意思的信息值得挖掘,感興趣的讀者可以用 Flink SQL 對數據進行更多維度的分析,並使用 Kibana 展示更多可視化圖,並觀測圖形數據的實時變化。
結尾
在本文中,我們展示了如何使用 Flink SQL 集成 Kafka, MySQL, Elasticsearch 以及 Kibana 來快速搭建一個實時分析應用。整個過程無需一行 Java/Scala 代碼,使用 SQL 純文本即可完成。期望通過本文,可以讓讀者了解到 Flink SQL 的易用和強大,包括輕松連接各種外部系統、對事件時間和亂序數據處理的原生支持、維表關聯、豐富的內置函數等等。希望你能喜歡我們的實戰演練,並從中獲得樂趣和知識!
查看更多:https://yqh.aliyun.com/detail/6404?utm_content=g_1000105579
上雲就看雲棲號:更多雲資訊,上雲案例,最佳實踐,產品入門,訪問:https://yqh.aliyun.com/