Flink系列之1.10版流式SQL應用


  隨着Flink 1.10的發布,對SQL的支持也非常強大。Flink 還提供了 MySql, Hive,ES, Kafka等連接器Connector,所以使用起來非常方便。

  接下來咱們針對構建流式SQL應用文章的梗概如下:

  1. 搭建流式SQL應用所需要的環境准備。

  2. 構建一個按每小時進行統計購買量的應用。

  3. 構建每天以10分鍾的粒度進行統計應用。

  4. 構建按分類進行排行,取出想要的結果應用。

 

1. 搭建流式應用所需要的環境准備

   Kafka,用於將數據寫入到Kafka中,然后Flink通過讀取Kafka的數據然后再進行處理。版本號:2.11。

     MySQL, 用於保存數據的分類。Flink從中讀取分類進行處理和計算 。版本號:8.0.15。

   ElasticSearch, 用於保存結果數據和進行索引存儲。下載的時候可以在搜索引擎里邊搜索“elasticsearch 國內”,這樣就可以從國內快速下載,要不然下載的太慢了。版本號:7.6.0。

   Kibana, 用於ES的結果展示,圖形化的界面美觀。 下載的時候也需要搜索“Kibana 國內”,比較快速。版本號:7.6.0。

     Flink, 核心的流處理程序,版本號:1.10。Flink支持國內鏡像下載,這個到時候可以自行找一下。

     Zookeeper,  Kafka依賴這個應用,所以也會用到的,這個什么版本都是可以的。我的版本號:3.4.12。

   當然我的是mac電腦,如果是mac電腦的話,下載ES和Kibana的時候要下載文件中帶“darwin”字樣的,可以在Mac中使用其他的不能執行。應該是程序里邊的編譯不同,這個也是一個小坑。

     因為Flink需要連接Mysql, Elasticseratch , Kafka,所以也需要提前下載Flink所需要的Connector jar包到Flink的lib里邊。

wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \
    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \
    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.0/flink-sql-connector-elasticsearch6_2.11-1.10.0.jar | \
    wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \
    wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar

  

   環境都准備好了,那么需要把環境都啟動起來,進行檢查。

   Elasticsearch啟動好了之后需要訪問這個網址沒有問題,說明成功了:http://localhost:9200/_cluster/health?pretty

   Flink啟動好之后需要訪問 http://localhost:8081 會有界面展示。

   Kibana 啟動好了之后訪問:http://127.0.0.1:5601/ 會有界面展示。當然Kibana在目錄conf/kibana.yml里邊需要把ES的地址給打開。

   Zookeeper 這個相信很多同學都會配置了,如果有不會配置的,可以自己搜索一下。

   我們先看一下最后的效果圖,可能不是特別好,是這么個意思。

 

 

 

 

 

 

 

2. 構建一個按每個小時統計購買量應用。 

 

  我們寫一個程序,往Kafka里邊寫數據,模擬一些連續的數據源頭。

  首先定義一個Pojo類。

package myflink.pojo;

public class UserBehavior {
    //用戶ID
    public long userId;
    //商品ID
    public long itemId;
    //商品類目ID
    public  int categoryId;
    //用戶行為,包括{"pv","buy","cart", "fav"}
    public String behavior;
    //行為發生的時間戳,單位秒
    public String ts;

    public long getUserId() {
        return userId;
    }

    public void setUserId(long userId) {
        this.userId = userId;
    }

    public long getItemId() {
        return itemId;
    }

    public void setItemId(long itemId) {
        this.itemId = itemId;
    }

    public int getCategoryId() {
        return categoryId;
    }

    public void setCategoryId(int categoryId) {
        this.categoryId = categoryId;
    }

    public String getBehavior() {
        return behavior;
    }

    public void setBehavior(String behavior) {
        this.behavior = behavior;
    }

    public String getTimestamp() {
        return ts;
    }

    public void setTimestamp(String ts) {
        this.ts = ts;
    }
}

  接着寫一個往Kafka寫數據的類。隨機生成用於的行為,里邊包括用戶的id,類目id等。讓程序運行起來。

package myflink.kafka;

import com.alibaba.fastjson.JSON;
import myflink.pojo.UserBehavior;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * @author huangqingshi
 * @Date 2020-03-15
 */
public class KafkaWriter {

    //本地的kafka機器列表
    public static final String BROKER_LIST = "localhost:9092";
    //kafka的topic
    public static final String TOPIC_USER_BEHAVIOR = "user_behaviors";
    //key序列化的方式,采用字符串的形式
    public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    //value的序列化的方式
    public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";

    private static final String[] BEHAVIORS = {"pv","buy","cart", "fav"};

    private static KafkaProducer<String, String> producer;

    public static void writeToKafka() throws Exception{


        //構建userBehavior, 數據都是隨機產生的
        int randomInt = RandomUtils.nextInt(0, 4);
        UserBehavior userBehavior = new UserBehavior();
        userBehavior.setBehavior(BEHAVIORS[randomInt]);
        Long ranUserId = RandomUtils.nextLong(1, 10000);
        userBehavior.setUserId(ranUserId);
        int ranCate = RandomUtils.nextInt(1, 100);
        userBehavior.setCategoryId(ranCate);
        Long ranItemId = RandomUtils.nextLong(1, 100000);
        userBehavior.setItemId(ranItemId);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
        userBehavior.setTimestamp(sdf.format(new Date()));

        //轉換為json
        String userBehaviorStr = JSON.toJSONString(userBehavior);

        //包裝成kafka發送的記錄
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_USER_BEHAVIOR, null,
                null, userBehaviorStr);
        //發送到緩存
        producer.send(record);
        System.out.println("向kafka發送數據:" + userBehaviorStr);
        //立即發送
        producer.flush();

    }

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", BROKER_LIST);
        props.put("key.serializer", KEY_SERIALIZER);
        props.put("value.serializer", VALUE_SERIALIZER);

        producer = new KafkaProducer<>(props);

        while(true) {
            try {
                //每一秒寫一條數據
                TimeUnit.SECONDS.sleep(1);
                writeToKafka();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }

}

  本地idea Console 輸出的結果是這樣的:

 

    向kafka發送數據:{"behavior":"buy","categoryId":7,"itemId":75902,"timestamp":"2020-03-15T11:35:11Z","ts":"2020-03-15T11:35:11Z","userId":4737}

  我們將Flink的任務數調整成10個,也就是同時執行的任務數。 位置在 conf/flink-conf.yaml,taskmanager.numberOfTaskSlots: 10,然后重啟下。我的已經啟動並且運行了3個任務,看下圖:

 

 

   我們接下來運行Flink 內置的客戶端。命令: bin/sql-client.sh embedded,這樣我們就開始了Flink SQL之旅了。我們使用Flink的DDL,從Kafka里邊讀取數據,采用ProcessingTime的時間事件進行處理,為ts設置水位線,允許5秒延遲。更多參考 時間屬性Flink DDL。里邊的Kafka 連接以及相關的配置,相信大家都不是很陌生。

CREATE TABLE user_behavior (
    userId BIGINT,
    itemId BIGINT,
    categoryId BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),   -- 通過計算列產生一個處理時間列
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定義watermark,ts成為事件時間列
) WITH (
    'connector.type' = 'kafka',  -- 使用 kafka connector
    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
    'connector.topic' = 'user_behaviors',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset',  -- 從起始 offset 開始讀取
    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
    'format.type' = 'json'  -- 數據源格式為 json
);

  接下來我們使用select來看一下Flink的數據,執行語句:select * from user_behavior,會出現如下圖。同時SQL上面還支持 show tables、describe user_behavior 等操作。

 

 

   我們需要將結果放入Elasticsearch,這樣也比較簡單,我們還通過DDL來創建一個表。我們只需要一個語句,就可以實現連接Elasticsearch(后邊簡稱ES)並且創建相應的Type和Index了。不需要自己再去創建一次,是不是很簡單,哈。里邊有兩個字段,一個是每天的小時數,一個是購買的統計量。當有數據寫入這個表的時候,那么就會將數據寫入到ES上,非常方便。

CREATE TABLE buy_cnt_per_hour (
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', -- 使用 elasticsearch connector
    'connector.version' = '6',  -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本
    'connector.hosts' = 'http://localhost:9200',  -- elasticsearch 地址
    'connector.index' = 'buy_cnt_per_hour',  -- elasticsearch 索引名,相當於數據庫的表名
    'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,相當於數據庫的庫名
    'connector.bulk-flush.max-actions' = '1',  -- 每條數據都刷新
    'format.type' = 'json',  -- 輸出數據格式 json
    'update-mode' = 'append'
);

  每個小時的購買量,那么我們需要的是使用滾動窗口,Tumbling Window,那么使用TUMBLE_START函數,另外我們還需要獲取ts中的小時數,那么需要使用HOUR函數。將所有behavior為buy的寫入到這個表中。

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

  這個時候看Flink里邊的任務中會出現這個任務,因為是持續不斷的進行處理的。執行過程中如果有數據的話,那么會將數據寫到表 buy_cnt_per_hour,同時也會將數據寫到ES里邊。

 

 

  下面我們來配置一下Kinbana來將結果進行展示,訪問 http://localhost:5601, 然后選擇左邊菜單的“Management”,然后選擇 “Index Patterns” -> “Create Index Pattern”, 輸入我們剛才創建的Index: “buy_cnt_per_hour”。可以通過左側的“Discover”按鈕就可以看到我們的數據了。

  

 

 

   我們繼續點擊左側的“Dashboard”按鈕,創建一個“用戶行為日志分析”的Dashboard。 進入左側的 “Visualize” - “Create Visualization" 選擇“Area”圖,Bucket的按我下邊截圖左下進行配置和選擇。

 

 

  保存后添加到Dashboard即可。這樣就從數據源頭到數據展示就構建完成了,是不是很快~

 

3. 構建每天以10分鍾的粒度進行統計獨立用戶數應用。

  

  我們繼續使用DDL創建Flink的表以及對應的ES的Index。

CREATE TABLE cumulative_uv (
    time_str STRING,
    uv BIGINT
) WITH (
    'connector.type' = 'elasticsearch',
    'connector.version' = '6',
    'connector.hosts' = 'http://localhost:9200',
    'connector.index' = 'cumulative_uv',
    'connector.document-type' = 'user_behavior',
    'format.type' = 'json',
    'update-mode' = 'upsert'
);

  創建好了需要將ts進行分解出來小時和分鍾,通過一個視圖,這個視圖和數據庫的視圖類似,不存儲數據,也不占用Flink的執行Task。首先將ts格式化,然后轉換成時間:小時:分鍾,分鍾后邊沒有0,結尾需要補個0。然后統計不同的用戶數需要使用DISTINCT函數和COUNT函數。還有使用Over Window功能,也就是從之前的數據到現在,以處理時間升序把數據按Window的功能來進行統計。直白的將就是有一條數據的話就會將數據處理, 然后有一條數據比當前最大值大的話會保留最大值。當前窗口是以每10分鍾為一個窗口。

CREATE VIEW uv_per_10min AS
SELECT
  MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,
  COUNT(DISTINCT userId) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

  這個視圖主要是數據比較多,只需要每10分鍾一個點其實就滿足要求了,那么現在我們需要做的就是再將數據處理一下即可寫入ES。

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

  這樣ES里邊就會有新的index產生,下一步我們在kibana里邊創建一個 index pattern, 輸入index “cumulative_uv”,接下來到 “Visualize”里邊創建一個 Visualization ,名為“累計獨立用戶數”,表選擇“Line”類型的圖標,其他指標和我下圖配置的一樣即可。

  

 

 

 

  累計獨立用戶數也創建好了。

 

4.   構建按分類進行排行,取出想要的結果應用。

    接下來我們需要按主類目進行統計和排序。因為子類目非常多。

  首先我們需要准備一個mysql, 然后創建好表。簡單些幾條對應的類目關系,當然可以根據自己所生成的數據進行自行寫入一些對應的關系表。

create table category (
    sub_category_id bigint(20),
    parent_category_id bigint(20)
);
insert into category(sub_category_id, parent_category_id) values(1038, 1);
insert into category(sub_category_id, parent_category_id) values(91244, 1);
insert into category(sub_category_id, parent_category_id) values(44712, 1);
insert into category(sub_category_id, parent_category_id) values(2,2);
insert into category(sub_category_id, parent_category_id) values(3,3);
insert into category(sub_category_id, parent_category_id) values(4,4);
insert into category(sub_category_id, parent_category_id) values(5,5);
insert into category(sub_category_id, parent_category_id) values(6,6);
insert into category(sub_category_id, parent_category_id) values(7,7);
insert into category(sub_category_id, parent_category_id) values(8,8);
insert into category(sub_category_id, parent_category_id) values(9,9);

  定義一個Flink表,數據從Mysql獲取,用於進行類目關系關聯。

CREATE TABLE category_dim (
    sub_category_id BIGINT,  -- 子類目
    parent_category_id BIGINT -- 頂級類目
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/flink',
    'connector.table' = 'category',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

  創建ES的index,用於存儲統計后的結果。

CREATE TABLE top_category (
    category_name STRING,  -- 類目名稱
    buy_cnt BIGINT  -- 銷量
) WITH (
    'connector.type' = 'elasticsearch',
    'connector.version' = '6',
    'connector.hosts' = 'http://localhost:9200',
    'connector.index' = 'top_category',
    'connector.document-type' = 'user_behavior',
    'format.type' = 'json',
    'update-mode' = 'upsert'
);

  接下來還是創建一個視圖,將表和類目關聯起來,方便后邊的統計結果。使用的是 Temporal Join

CREATE VIEW rich_user_behavior AS
SELECT U.userId, U.itemId, U.behavior,
  CASE C.parent_category_id
    WHEN 1 THEN '服飾鞋包'
    WHEN 2 THEN '家裝家飾'
    WHEN 3 THEN '家電'
    WHEN 4 THEN '美妝'
    WHEN 5 THEN '母嬰'
    WHEN 6 THEN '3C數碼'
    WHEN 7 THEN '運動戶外'
    WHEN 8 THEN '食品'
    ELSE '其他'
  END AS category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.categoryId = C.sub_category_id;

  將類型為“buy”的寫入到表,同時也就是寫入了ES里邊,然后ES里邊的index-top_category 也就有了數據了。

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;

  我們繼續在Kibana里邊創建一個index pattern,輸入“top_category”,然后visualize里邊創建一個visualization 名為類目排行榜。詳細的配置可參考如下。

 

 

   好了整個的過程計算創建完了。

   

  通過使用Flink 1.10以及對應的Connector, 實現了對Mysql, Kafka, Elasticsearch 的快速連接,更快的達到的我們想要實現的效果。

  里邊涉及到往kafka里邊寫數據可參考工程:https://github.com/stonehqs/flink-demo

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM