Flink SQL結合Kafka、Elasticsearch、Kibana實時分析電商用戶行為


使用Flink SQL結合Kafka、Elasticsearch、Kibana實時分析電商用戶行為 (Use flink sql to combine kafka, elasticsearch and kibana, real-time analysis of e-commerce user behavior.)

Flink與其它實時計算工具區別之一是向用戶提供了更多抽象易用的API,比如讀寫各類程序的connector接口、Table API和SQL,從數據加載、計算、一直到輸出,所有操作都可以使用SQL完成,大大減少了開發量和維護成本,本文將通過實時分析電商用戶行為數據介紹flink sql的使用,分析的內容如下:

  1. 分析每10分鍾累計在線用戶數;
  2. 分析每小時購買量;
  3. 分析top瀏覽商品類目(瀏覽的商品歸屬於那個類目);

1 最終實時分析kibana展現效果

2 流程和版本信息

  • kafka --> flink --> es -->kibana

數據采集存儲到kafka,通過flink消費kafka數據,實時計算,結果存儲到es,最后通過kibana展現。

版本信息
flink 1.12.1、kafka_2.13-2.7.0、elasticsearch 7.10.1、kibana 7.10.1

3 數據結構

電商用戶行為分析共涉及3個表,商品類目信息表、商品類目信息表、用戶行為信息表,其中用戶行為信息表共5個列:用戶ID、商品ID、商品類目ID、行為類型、時間戳;

4 kafka數據

./kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9092 --from-beginning --max-messages 5

1,2268318,2520377,pv,1511544070
1,2333346,2520771,pv,1511561733

數據來源於淘寶開放的用戶行為數據UserBehavior,數據格式為csv,以逗號分隔;

2 使用Flink SQL建表讀取kafka數據

現在數據已經存儲在kafka,進入flink sql client,

創建消費kafka數據表;

CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, app_time BIGINT, ts AS TO_TIMESTAMP(FROM_UNIXTIME(app_time, 'yyyy-MM-dd HH:mm:ss')), proctime AS PROCTIME(), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', --使用kafka connector 'topic' = 'user_behavior', --kafka topic 'scan.startup.mode' = 'earliest-offset', --從topic最開始處開始消費 'properties.bootstrap.servers'='localhost:9092', --kafka broker地址 'properties.group.id' = 'test-group03', 'format' = 'csv', --存儲在kafka的數據格式為csv 'csv.field-delimiter'=',' --數據分隔符 ); 
  • WATERMARK 定義處理混亂次序的事件時間屬性,每5秒觸發一次window
  • PROCTIME 是內置函數,產生一個虛擬的Processing Time列,偶爾會用到
  • WITH 里定義kafka連接信息和屬性
  • 由於事件時間格式為bigint,在sql中將其轉為timestamp

3 分析場景

3.1 場景1:分析每10分鍾累計在線用戶數

最終的分析結果數據會寫入es,首先創建es index和寫入es的表;

CREATE TABLE cumulative_uv (
    date_str STRING,
    time_str STRING,
    uv BIGINT,
    PRIMARY KEY (date_str, time_str) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://localhost:9200',
    'index' = 'cumulative_uv'
);
  • WITH 里面定義es連接信息和屬性

分析每10分鍾在線用戶數只需要知道日期(date_str)、時間(time_str)、數量(uv)即可;上面已經定義了消費kafka數據的表 user_behavior,現在查詢該表,並將數據寫入es;

INSERT INTO cumulative_uv SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uv FROM ( SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str, user_id FROM user_behavior) GROUP BY date_str;

由於分析跨度為每10分鍾,在sql 內層查詢中使用 SUBSTR 截取事件小時和分鍾字符,拼湊成每10分鍾的數據,比如: 12:10,12:20。提交sql后,flink會將sql以流作業方式按照設定的WATERMARK和窗口提交到集群運行;

現在查詢kibina可以看到數據已經實時寫入.

3.2 場景2:分析每小時購買量

創建es index和寫入es的表;

CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'buy_cnt_per_hour' );

查詢 user_behavior 表,將數據寫入es;

INSERT INTO buy_cnt_per_hour SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*) FROM user_behavior WHERE behavior='buy' GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
  • HOUR 為內置函數,從一個 TIMESTAMP 列中提取出一天中第幾個小時的值
  • TUMBLE 為窗口函數,按設定的時間切窗

首先通過(behavior='buy') 過濾出購買數據,再通過窗口函數(TUMBLE)按一小時切窗,統計出每小時共有多少"buy"的用戶行為。

3.3 場景3:分析top瀏覽商品類目

由於kafka數據存儲的是商品id,商品id對應的商品類目名稱存儲在mysql數據庫,需先創建連接mysql的數據表;

CREATE TABLE category ( category_id BIGINT, category_name STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'category', 'username' = 'sywu', 'password' = 'sywu', 'lookup.cache.max-rows' = '5000', 'lookup.cache.ttl' = '10min' );

為了后續查詢方便,創建kafka數據表和mysql數據表關聯視圖;

CREATE VIEW rich_user_behavior AS SELECT U.user_id, U.item_id, U.behavior, case when C.category_name is null then 'other' else C.category_name end as category_name FROM user_behavior AS U LEFT JOIN category FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.category_id;

現在 kafka 數據表和 mysql數據表通過視圖表 rich_user_behavior 關聯在一起;分析top瀏覽商品類目只需要知道商品類目名和瀏覽數即可,所以在此創建一張包含商品類目名和瀏覽數的表;

CREATE TABLE top_category ( category_name STRING PRIMARY KEY NOT ENFORCED, buy_cnt BIGINT ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'top_category' );

查詢視圖表 rich_user_behavior表,過濾分組統計數據;

INSERT INTO top_category SELECT category_name, COUNT(*) buy_cnt FROM rich_user_behavior WHERE behavior='buy' GROUP BY category_name;

到此3個分析需求實現,作業正常實時運行。 

4 總結

通過Flink 提供的Table API和SQL,以及處理數據的窗口、讀寫各類程序的connector接口和函數,使用上面的SQL DML操作,flink即實現了用戶行為數據的實時分析需求;從開發角度看,代碼量和開發難度大大降低;從維護角度看,維護成本也大大降低。

參考文獻


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM