Filnk實時數倉(DWM層)


第1章 DWM層和DWS設計

1.1 設計思路

  DWM(Data WareHouse Middle),一般成為數據中間層,該層會在DWD層的基礎上, 對數據做輕度的聚合操作,生成一系列的中間表,提升公共指標的復用性,減少重復加工。直觀來講,就是對通用的核心維度進行聚合操作,算出相應的統計指標。

  我們在之前通過分流等手段,把數據分拆成了獨立的kafka topic。那么接下來如何處理數據,就要思考一下我們到底要通過實時計算出哪些指標項。因為實時計算與離線不同,實時計算的開發和運維成本都是非常高的,要結合實際情況考慮是否有必要象離線數倉一樣,建一個大而全的中間層。

  如果沒有必要大而全,這時候就需要大體規划一下要實時計算出的指標需求了。把這些指標以主題寬表的形式輸出就是我們的DWS層。

1.2 需求梳理

統計主題

需求指標

輸出方式

計算來源

來源層級

訪客

pv

可視化大屏

page_log直接可求

dwd

 

uv

可視化大屏

需要用page_log過濾去重

dwm

 

跳出率

可視化大屏

需要通過page_log行為判斷

dwm

 

連續訪問頁面數

可視化大屏

需要識別開始訪問標識

dwd

 

連續訪問時長

可視化大屏

需要識別開始訪問標識

dwd

商品

點擊

多維分析

page_log直接可求

dwd

 

收藏

多維分析

收藏表

dwd

 

加入購物車

多維分析

購物車表

dwd

 

下單

可視化大屏

訂單寬表

dwm

 

支付

多維分析

支付寬表

dwm

 

退款

多維分析

退款表

dwd

 

評論

多維分析

評論表

dwd

地區

pv

多維分析

page_log直接可求

dwd

 

uv

多維分析

需要用page_log過濾去重

dwm

 

下單

可視化大屏

訂單寬表

dwm

關鍵詞

搜索關鍵詞

可視化大屏

頁面訪問日志 直接可求

dwd

 

點擊商品關鍵詞

可視化大屏

商品主題下單再次聚合

dws

 

下單商品關鍵詞

可視化大屏

商品主題下單再次聚合

dws

  當然實際需求還會有更多,這里主要以為可視化大屏為目的進行實時計算的處理。DWM層的定位是什么,DWM層主要服務DWS,因為部分需求直接從DWD層到DWS層中間會有一定的計算量,而且這部分計算的結果很有可能被多個DWS層主題復用,所以部分DWD成會形成一層DWM,我們這里主要涉及業務:

  1)訪問UV計算

  2)跳出明細計算

  3)訂單寬表

  4)支付寬表

第2章 DWM層: UV計算

2.1 需求分析與思路

  UV,全稱是Unique Visitor,即獨立訪客,對於實時計算中,也可以稱為DAU(Daily Active User),即每日活躍用戶,因為實時計算中的uv通常是指當日的訪客數

  那么如何從用戶行為日志中識別出當日的訪客,那么有3點:

    其一,是識別出該訪客打開的第一個頁面,表示這個訪客開始進入我們的應用

    其二,由於訪客可以在一天中多次進入應用,所以我們要在一天的范圍內進行去重

    其三,如何在第二天某個用戶重新對uv做貢獻

2.2 去重邏輯

  1)使用event-time語義(考慮數據的亂序)

  2)按照mid分組

  3)添加窗口

  4)過濾出來當天的首次訪問記錄(去重)

  5)使用flink的狀態, 而且狀態只保留一天即可,什么時候清除狀態?  現在的日期和狀態中保存的日期不一致的時候清除!

  6)把當天的首次訪問記錄寫入到dwm層(Kafka)

2.3 具體實現代碼

  1)在Constant中添加常量

public static final String TOPIC_DWM_UV = "dwn_uv";

  2)DwmUvApp

package com.yuange.flinkrealtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONAware;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV1;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.util.FlinkSinkUtil;
import com.yuange.flinkrealtime.util.YuangeCommonUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

/**
 * @作者:袁哥
 * @時間:2021/8/2 8:48
 */
public class DwmUvApp extends BaseAppV1 {

    public static void main(String[] args) {
        new DwmUvApp().init(
                3001,
                1,
                "DwmUvApp",
                "DwmUvApp",
                Constant.TOPIC_DWD_PAGE //從page日志主題中讀取數據
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> sourceStream) {
        sourceStream
                .map(JSON::parseObject)    //將數據轉化為JSON格式
                .assignTimestampsAndWatermarks( //添加水印
                        WatermarkStrategy
                                .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((log, ts) -> log.getLong("ts"))
                )
                .keyBy(obj -> obj.getJSONObject("common").getString("mid")) //按設備id分組
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))   //開一個5秒的滾動窗口
                .process(new ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>() {
                    ValueState<Long> firstVisitState;
                    SimpleDateFormat simpleDateFormat;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        firstVisitState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("firstVisitState", Long.class));
                        simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                    }

                    @Override
                    public void process(String s, Context context, Iterable<JSONObject> elements, Collector<JSONObject> out) throws Exception {
                        //當第二天的時候, uv重新開啟新的去重
                        String today = simpleDateFormat.format(context.window().getEnd());  //以窗口關閉時間作為今天
                        String yesterday = simpleDateFormat.format(firstVisitState.value() == null ? 0L : firstVisitState.value()); //若firstVisitState狀態為null,說明用戶今天還沒有訪問

                        if (!today.equals(yesterday)){
                            firstVisitState.clear();
                        }

                        if (firstVisitState.value() == null){
                            List<JSONObject> list = YuangeCommonUtil.toList(elements);
                            JSONObject min = Collections.min(list, Comparator.comparing(o -> o.getLong("ts"))); //排序,然后取時間最早的那條記錄
                            out.collect(min);
                            firstVisitState.update(min.getLong("ts"));
                        }
                    }
                })
                .map(JSONAware::toJSONString)
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWM_UV));
    }
}

2.4 測試數據是否可以到DWM層

  1)啟動Hadoop集群

hadoop.sh start

  2)啟動Zookeeper

zk start

  3)啟動Kafka

kafka.sh start

  4)啟動日志服務器(之前寫在log-lg.sh腳本中)

log-lg.sh start

  5)啟動flink的yarn-session模式

/opt/module/flink-yarn/bin/yarn-session.sh -d

  6)修改之前寫的realtime.sh腳本

#!/bin/bash
flink=/opt/module/flink-yarn/bin/flink
jar=/opt/module/applog/flink-realtime-1.0-SNAPSHOT.jar

apps=(
        com.yuange.flinkrealtime.app.dwd.DwdLogApp
        com.yuange.flinkrealtime.app.dwd.DwdDbApp
        com.yuange.flinkrealtime.app.dwm.DwmUvApp
)

for app in ${apps[*]} ; do
        $flink run -d -c $app $jar
done

  7)運行腳本,將Flink程序提交至yarn-session中(在此之前先把打包好的jar包上傳至指定位置)

  8)程序啟動后生產日志數據,模擬新增

cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

  9)也可以啟動消費者消費dwn_uv主題

consume dwn_uv

第3章 DWM層: 跳出明細

3.1 需求分析與思路

3.1.1 什么是跳出

  跳出就是用戶成功訪問了網站的入口頁面(例如首頁)后就退出,不再繼續訪問網站的其它頁面跳出率計算公式:跳出率=訪問一個頁面后離開網站的次數 / 總訪問次數

  觀察關鍵詞的跳出率就可以得知用戶對網站內容的認可,或者說你的網站是否對用戶有吸引力。而網站的內容是否能夠對用戶有所幫助留住用戶也直接可以在跳出率中看出來,所以跳出率是衡量網站內容質量的重要標准。

  關注跳出率,可以看出引流過來的訪客是否能很快的被吸引,渠道引流過來的用戶之間的質量對比,對於應用優化前后跳出率的對比也能看出優化改進的成果。

3.1.2 計算跳出率的思路

  首先要識別哪些是跳出行為,要把這些跳出的訪客最后一個訪問的頁面識別出來。那么要抓住幾個特征:

  1)該頁面是用戶近期訪問的第一個頁面:這個可以通過該頁面是否有上一個頁面(last_page_id)來判斷,如果這個表示為空,就說明這是這個訪客這次訪問的第一個頁面。

  2)首次訪問之后很長一段時間(自己設定),用戶沒繼續再有其他頁面的訪問。

    這個訪問的判斷,其實有點麻煩,首先這不是用一條數據就能得出結論的,需要組合判斷,要用一條存在的數據和不存在的數據進行組合判斷。而且要通過一個不存在的數據求得一條存在的數據。更麻煩的他並不是永遠不存在,而是在一定時間范圍內不存在。那么如何識別有一定失效的組合行為呢?

    最簡單的辦法就是Flink自帶的CEP技術。這個CEP非常適合通過多條數據組合來識別某個事件。

  3)用戶跳出事件本質上就是一個條件事件加一個超時事件的組合

3.1.3 具體實現代碼

  1)確認是否添加了CEP的依賴包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.12</artifactId>
    <version>1.13.1</version>
</dependency>

  2)使用Event-time

  3)按照mid分組: 所有行為肯定是基於相同的mid來計算

  4)定義模式:  首次進入, 30s內跟着一個多個訪問記錄

  5)取出那些超時的數據就是我們想要的

  6)測試版本代碼

package com.yuange.flinkrealtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV1;
import com.yuange.flinkrealtime.common.Constant;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @作者:袁哥
 * @時間:2021/8/2 19:16
 */
public class DwmJumpDetailApp extends BaseAppV1 {

    public static void main(String[] args) {
        new DwmJumpDetailApp().init(
                3002,
                1,
                "DwmJumpDetailApp",
                "DwmJumpDetailApp",
                Constant.TOPIC_DWD_PAGE     //消費page主題中的數據
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> sourceStream) {
        sourceStream =
                environment.fromElements(
                        "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
                        "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":11000} ",
                        "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000}",
                        "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                                "\"home\"},\"ts\":17000} ",
                        "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                                "\"detail\"},\"ts\":50000} "
                );

        KeyedStream<JSONObject, String> stream = sourceStream.map(JSON::parseObject) //將數據轉為JSON格式
                .assignTimestampsAndWatermarks( //添加水印
                        WatermarkStrategy
                                .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((log, ts) -> log.getLong("ts"))
                )
                .keyBy(obj -> obj.getJSONObject("common").getString("mid"));//按照mid分組

        //定義一個入口,后面緊跟一個頁面
        Pattern<JSONObject, JSONObject> pattern = Pattern
                .<JSONObject>begin("entry")
                .where(new SimpleCondition<JSONObject>() { //入口的條件是用戶沒有訪問其他頁面
                    @Override
                    public boolean filter(JSONObject value) throws Exception {
                        String lastPageId = value.getJSONObject("page").getString("last_page_id");  //獲取上一個頁面的id
                        return lastPageId == null || lastPageId.isEmpty();
                    }
                })
                .next("nextPage")
                //讓那些進來之后pageId和last_page_id不為空的數據留下(即該條數據進入窗口后,從上一個頁面跳到了下一個頁面)
                //如果這條數據超時,說明它沒有跳到其它頁面,而這正是我們想要的結果)
                .where(new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject value) throws Exception {
                        JSONObject page = value.getJSONObject("page");
                        String page_id = page.getString("page_id");
                        String last_page_id = page.getString("last_page_id");
                        return page_id != null && last_page_id != null && !last_page_id.isEmpty();
                    }
                })
                .within(Time.seconds(5));//開一個5秒的窗口

        //模式應用到流上
        PatternStream<JSONObject> ps = CEP.pattern(stream, pattern);

        //取出滿足模式數據(或者超時數據)
        SingleOutputStreamOperator<JSONObject> normal = ps.select(
                new OutputTag<JSONObject>("timeout") {  //側輸出流,存放超時數據
                },
                new PatternTimeoutFunction<JSONObject, JSONObject>() {
                    @Override
                    public JSONObject timeout(Map<String, List<JSONObject>> pattern,
                                              long timeoutTimestamp) throws Exception {
                        //超時數據, 就是跳出明細
                        return pattern.get("entry").get(0); //從entry窗口中獲取JSON數據
                    }
                },
                new PatternSelectFunction<JSONObject, JSONObject>() {
                    @Override
                    public JSONObject select(Map<String, List<JSONObject>> map) throws Exception {
                        return null;    //滿足正常訪問的數據, 不用返回
                    }
                }
        );

        normal.getSideOutput(new OutputTag<JSONObject>("timeout"){}).print("jump");
    }
}

  7)升級版

    (1)在Constant中添加常量

public static final String TOPIC_DWM_USER_JUMP_DETAIL = "dwm_user_jump_detail";

    (2)DwmJumpDetailApp_Two

package com.yuange.flinkrealtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONAware;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV1;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.util.FlinkSinkUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @作者:袁哥
 * @時間:2021/8/2 19:51
 * 一個用戶跳出有什么特質
 *     1. 當前頁面應該是入口
 *         上一個頁面是空
 *     2. 過了一段時間, 沒有再訪問其他頁面
 *         超時時間
 */
public class DwmJumpDetailApp_Two extends BaseAppV1 {
    public static void main(String[] args) {
        new DwmJumpDetailApp_Two().init(
                3002,
                1,
                "DwmJumpDetailApp_Two",
                "DwmJumpDetailApp_Two",
                Constant.TOPIC_DWD_PAGE     //消費page主題中的數據
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> sourceStream) {
        KeyedStream<JSONObject, String> stream = sourceStream.map(JSON::parseObject) //將數據轉為JSON格式
                .assignTimestampsAndWatermarks( //添加水印
                        WatermarkStrategy
                                .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((log, ts) -> log.getLong("ts"))
                )
                .keyBy(obj -> obj.getJSONObject("common").getString("mid"));//按照mid分組

        //定義一個入口,后面緊跟一個頁面
        Pattern<JSONObject, JSONObject> pattern = Pattern
                .<JSONObject>begin("entry")
                //入口的條件
                .where(new SimpleCondition<JSONObject>() { //入口的條件是用戶沒有訪問其他頁面
                    @Override
                    public boolean filter(JSONObject value) throws Exception {
                        String lastPageId = value.getJSONObject("page").getString("last_page_id");  //獲取上一個頁面的id
                        return lastPageId == null || lastPageId.isEmpty();
                    }
                })
                .next("nextPage")
                //入口的條件
                .where(new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject value) throws Exception {
                        String lastPageId = value.getJSONObject("page").getString("last_page_id");  //獲取上一個頁面的id
                        return lastPageId == null || lastPageId.isEmpty();
                    }
                })
                .within(Time.seconds(5));//開一個5秒的窗口

        //模式應用到流上
        PatternStream<JSONObject> ps = CEP.pattern(stream, pattern);

        //取出滿足模式數據(或者超時數據)
        SingleOutputStreamOperator<JSONObject> normal = ps.select(
                new OutputTag<JSONObject>("timeout") {  //側輸出流,存放超時數據
                },
                new PatternTimeoutFunction<JSONObject, JSONObject>() {
                    @Override
                    public JSONObject timeout(Map<String, List<JSONObject>> pattern,
                                              long timeoutTimestamp) throws Exception {
                        return pattern.get("entry").get(0); //從entry窗口中獲取JSON數據
                    }
                },
                new PatternSelectFunction<JSONObject, JSONObject>() {
                    @Override
                    public JSONObject select(Map<String, List<JSONObject>> pattern) throws Exception {
                        return pattern.get("entry").get(0); //從entry窗口中獲取JSON數據
                    }
                }
        );

        normal.union(normal.getSideOutput(new OutputTag<JSONObject>("timeout"){}))  //將超時的數據和主數據流中的數據union
                .map(JSONAware::toJSONString)
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWM_USER_JUMP_DETAIL));
    }
}

3.1.4 測試

  1)將程序打包並上傳至/opt/module/applog

  2)停止yarn-session

  3)修改realtime.sh腳本

vim /home/atguigu/bin/realtime.sh
#!/bin/bash
flink=/opt/module/flink-yarn/bin/flink
jar=/opt/module/applog/flink-realtime-1.0-SNAPSHOT.jar

apps=(
        #com.yuange.flinkrealtime.app.dwd.DwdLogApp
        #com.yuange.flinkrealtime.app.dwd.DwdDbApp
        #com.yuange.flinkrealtime.app.dwm.DwmUvApp
        com.yuange.flinkrealtime.app.dwm.DwmJumpDetailApp_Two
)

for app in ${apps[*]} ; do
        $flink run -d -c $app $jar
done

  4)啟動yarn-session

/opt/module/flink-yarn/bin/yarn-session.sh -d

  5)啟動realtime.sh腳本

realtime.sh

  6)啟動一個消費者,消費dwm_user_jump_detail主題中的數據

consume dwm_user_jump_detail

  7)模擬生產日志數據

cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

  8)查看消費情況

第4章 DWM層: 訂單寬表

4.1 需求分析與思路

  訂單是統計分析的重要的對象,圍繞訂單有很多的維度統計需求,比如用戶、地區、商品、品類、品牌等等。為了之后統計計算更加方便,減少大表之間的關聯,所以在實時計算過程中將圍繞訂單的相關數據整合成為一張訂單的寬表

  那究竟哪些數據需要和訂單整合在一起?

  如上圖,由於在之前的操作我們已經把數據分拆成了事實數據和維度數據,事實數據(綠色)進入kafka數據流(DWD層)中,維度數據(藍色)進入hbase中長期保存。那么我們在DWM層中要把實時和維度數據進行整合關聯在一起,形成寬表。那么這里就要處理有兩種關聯,事實數據和事實數據關聯、事實數據和維度數據關聯。

  1)事實數據和事實數據關聯,其實就是流與流之間的關聯。

  2)事實數據與維度數據關聯,其實就是流計算中查詢外部數據源

4.2 訂單和訂單明細關聯

4.2.1 用到的POJO類

  1)訂單表POJO

package com.yuange.flinkrealtime.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;

/**
 * @作者:袁哥
 * @時間:2021/8/2 20:58
 */
@AllArgsConstructor
@NoArgsConstructor
@Data
public class OrderInfo {
    private Long id;
    private Long province_id;
    private String order_status;
    private Long user_id;
    private BigDecimal total_amount;
    private BigDecimal activity_reduce_amount;
    private BigDecimal coupon_reduce_amount;
    private BigDecimal original_total_amount;
    private BigDecimal feight_fee;
    private String expire_time;
    private String create_time;
    private String operate_time;
    private String create_date; // 把其他字段處理得到
    private String create_hour;
    private Long create_ts;

    // 為了create_ts時間戳賦值, 所以需要手動補充
    public void setCreate_time(String create_time) throws ParseException {
        this.create_time = create_time;

        this.create_date = this.create_time.substring(0, 10); // 年月日
        this.create_hour = this.create_time.substring(11, 13); // 小時

        final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        this.create_ts = sdf.parse(create_time).getTime();
    }
}

  2)訂單明細表POJO

package com.yuange.flinkrealtime.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;

/**
 * @作者:袁哥
 * @時間:2021/8/2 21:01
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderDetail {
    private Long id;
    private Long order_id;
    private Long sku_id;
    private BigDecimal order_price;
    private Long sku_num;
    private String sku_name;
    private String create_time;
    private BigDecimal split_total_amount;
    private BigDecimal split_activity_amount;
    private BigDecimal split_coupon_amount;
    private Long create_ts;
    // 為了create_ts時間戳賦值, 所以需要手動補充
    public void setCreate_time(String create_time) throws ParseException {
        this.create_time = create_time;
        final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        this.create_ts = sdf.parse(create_time).getTime();

    }
}

  3)join之后的寬表POJO

package com.yuange.flinkrealtime.bean;

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;

import static java.lang.Integer.parseInt;

/**
 * @作者:袁哥
 * @時間:2021/8/2 21:12
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderWide {
    private Long detail_id;
    private Long order_id;
    private Long sku_id;
    private BigDecimal order_price;
    private Long sku_num;
    private String sku_name;
    private Long province_id;
    private String order_status;
    private Long user_id;

    private BigDecimal total_amount;
    private BigDecimal activity_reduce_amount;
    private BigDecimal coupon_reduce_amount;
    private BigDecimal original_total_amount;
    private BigDecimal feight_fee;
    private BigDecimal split_feight_fee;
    private BigDecimal split_activity_amount;
    private BigDecimal split_coupon_amount;
    private BigDecimal split_total_amount;

    private String expire_time;
    private String create_time;
    private String operate_time;
    private String create_date; // 把其他字段處理得到
    private String create_hour;

    private String province_name;//查詢維表得到
    private String province_area_code;
    private String province_iso_code;
    private String province_3166_2_code;

    private Integer user_age;
    private String user_gender;

    private Long spu_id;     //作為維度數據 要關聯進來
    private Long tm_id;
    private Long category3_id;
    private String spu_name;
    private String tm_name;
    private String category3_name;

    public OrderWide(OrderInfo orderInfo, OrderDetail orderDetail) {
        mergeOrderInfo(orderInfo);
        mergeOrderDetail(orderDetail);

    }

    public void mergeOrderInfo(OrderInfo orderInfo) {
        if (orderInfo != null) {
            this.order_id = orderInfo.getId();
            this.order_status = orderInfo.getOrder_status();
            this.create_time = orderInfo.getCreate_time();
            this.create_date = orderInfo.getCreate_date();
            this.create_hour = orderInfo.getCreate_hour();
            this.activity_reduce_amount = orderInfo.getActivity_reduce_amount();
            this.coupon_reduce_amount = orderInfo.getCoupon_reduce_amount();
            this.original_total_amount = orderInfo.getOriginal_total_amount();
            this.feight_fee = orderInfo.getFeight_fee();
            this.total_amount = orderInfo.getTotal_amount();
            this.province_id = orderInfo.getProvince_id();
            this.user_id = orderInfo.getUser_id();
        }
    }

    public void mergeOrderDetail(OrderDetail orderDetail) {
        if (orderDetail != null) {
            this.detail_id = orderDetail.getId();
            this.sku_id = orderDetail.getSku_id();
            this.sku_name = orderDetail.getSku_name();
            this.order_price = orderDetail.getOrder_price();
            this.sku_num = orderDetail.getSku_num();
            this.split_activity_amount = orderDetail.getSplit_activity_amount();
            this.split_coupon_amount = orderDetail.getSplit_coupon_amount();
            this.split_total_amount = orderDetail.getSplit_total_amount();
        }
    }

    public void setUser_age(String birthday){
        try {
            this.user_age = parseInt(birthday);
        } catch (Exception e) {
            try {
                long bir = new SimpleDateFormat("yyyy-MM-dd").parse(birthday).getTime();
                this.user_age = Math.toIntExact((System.currentTimeMillis() - bir) / 1000 / 60 / 60 / 24 / 365);
            } catch (ParseException e1) {
                e1.printStackTrace();
            }
        }
    }

    public String toJsonString(){
        return JSON.toJSONString(this);
    }
}

4.2.2 join代碼清單

  1)升級BaseAppBaseAppV2

    由於前面封裝的BaseApp只能消費一個topic得到一個流, 這次需要消費多個topic, 得到多個流. 所以需要對前面的BaseApp進行重構,重構init方法和抽象的run

package com.yuange.flinkrealtime.app;

import com.yuange.flinkrealtime.util.FlinkSourceUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

/**
 * @作者:袁哥
 * @時間:2021/8/2 21:13
 */
public abstract class BaseAppV2 {

    public void init(int port, int p, String ck, String groupId, String topic,String... otherTopics){
        System.setProperty("HADOOP_USER_NAME","atguigu");
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port",port);
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration).setParallelism(p);

        environment.enableCheckpointing(5000);  //檢查點之間的時間間隔,單位是毫秒
        environment.setStateBackend(new HashMapStateBackend()); //定義狀態后端,以保證將檢查點狀態寫入遠程(HDFS)
        environment.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop162:8020/flinkparent/ck/" + ck);   //配置檢查點存放地址

        environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //設置檢查點模式:精准一次
        environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);   //設置檢查點失敗時重試次數
        environment.getCheckpointConfig()
                .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  //設置檢查點持久化:取消作業時保留外部化檢查點

        HashSet<String> topics = new HashSet<>(Arrays.asList(otherTopics));
        topics.add(topic);

        Map<String, DataStreamSource<String>> streams = new HashMap<>();
        for (String t : topics) {

            DataStreamSource<String> stream = environment.addSource(FlinkSourceUtil.getKafkaSource(groupId, t));
            streams.put(t, stream);
        }

        run(environment,streams);

        try {
            environment.execute(ck);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    protected abstract void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams);
}

  2)在Constant添加常量

public static final String TOPIC_DWD_ORDER_INFO = "dwd_order_info";
public static final String TOPIC_DWD_ORDER_DETAIL = "dwd_order_detail";

  3)具體的join代碼

package com.yuange.flinkrealtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV2;
import com.yuange.flinkrealtime.bean.OrderDetail;
import com.yuange.flinkrealtime.bean.OrderInfo;
import com.yuange.flinkrealtime.bean.OrderWide;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.util.DimUtil;
import com.yuange.flinkrealtime.util.JdbcUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.time.Duration;
import java.util.Map;

/**
 * @作者:袁哥
 * @時間:2021/8/2 21:18
 */
public class DwmOrderWideApp extends BaseAppV2 {

    public static void main(String[] args) {
        new DwmOrderWideApp().init(
                3003,
                1,
                "DwmOrderWideApp",
                "DwmOrderWideApp",
                Constant.TOPIC_DWD_ORDER_INFO, Constant.TOPIC_DWD_ORDER_DETAIL
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
        // 1. 事實表進行join
        SingleOutputStreamOperator<OrderWide> orderWideSingleOutputStreamOperator = factJoin(streams);
    }

    private SingleOutputStreamOperator<OrderWide> factJoin(Map<String, DataStreamSource<String>> streams) {
        KeyedStream<OrderInfo, Long> orderInfoStream = streams.get(Constant.TOPIC_DWD_ORDER_INFO)
                .map(info -> JSON.parseObject(info, OrderInfo.class))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((info, ts) -> info.getCreate_ts())
                )
                .keyBy(OrderInfo::getId);

        KeyedStream<OrderDetail, Long> orderDetailStream = streams.get(Constant.TOPIC_DWD_ORDER_DETAIL)
                .map(info -> JSON.parseObject(info, OrderDetail.class))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((detail, ts) -> detail.getCreate_ts())
                )
                .keyBy(OrderDetail::getOrder_id);

        return orderInfoStream.intervalJoin(orderDetailStream)
                .between(Time.seconds(-5),Time.seconds(5))
                .process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
                    @Override
                    public void processElement(OrderInfo left,
                                               OrderDetail right,
                                               Context ctx,
                                               Collector<OrderWide> out) throws Exception {
                        out.collect(new OrderWide(left, right));
                    }
                });
    }
}

4.3 關聯維度表

  維度關聯實際上就是在流中查詢存儲在hbase中的數據表但是即使通過主鍵的方式查詢,hbase速度的查詢也是不及流之間的join。外部數據源的查詢常常是流式計算的性能瓶頸,所以咱們再這個基礎上還有進行一定的優化。

4.3.1 把維度數據初始化到HBase

  1)開啟 hbase

start-hbase.sh

  2)啟動maxwell

maxwell.sh start

  3)啟動yarn-session

/opt/module/flink-yarn/bin/yarn-session.sh -d

  4)運行realtime.sh腳本,啟動DwdDbApp

  5)使用maxwell的bootstrap導入維度數據,共用到6張維度表: user_info, base_province, sku_info, spu_info, base_category3, base_trademark

cd /opt/module/maxwell-1.27.1/
bin/maxwell-bootstrap --user maxwell  --password aaaaaa --host hadoop162  --database flinkdb --table user_info --client_id maxwell_1

bin/maxwell-bootstrap --user maxwell  --password aaaaaa --host hadoop162  --database flinkdb --table base_province --client_id maxwell_1

bin/maxwell-bootstrap --user maxwell  --password aaaaaa --host hadoop162  --database flinkdb --table sku_info --client_id maxwell_1

bin/maxwell-bootstrap --user maxwell  --password aaaaaa --host hadoop162  --database flinkdb --table spu_info --client_id maxwell_1

bin/maxwell-bootstrap --user maxwell  --password aaaaaa --host hadoop162  --database flinkdb --table base_category3 --client_id maxwell_1

bin/maxwell-bootstrap --user maxwell  --password aaaaaa --host hadoop162  --database flinkdb --table base_trademark --client_id maxwell_1

  6)查看是否導入數據

/opt/module/phoenix-5.0.0/bin/sqlline.py

4.3.2 封裝JDBCUtil

  1)在JdbcUtil中添加queryList()方法

public static <T> List<T> queryList(Connection conn, String sql, Object[] args, Class<T> tClass) throws SQLException, IllegalAccessException, InstantiationException, InvocationTargetException {
        List<T> result = new ArrayList<>();
        // 通過一個sql查詢數據, 應該得到多行, 每行封裝到一個T類型的對象中
        PreparedStatement ps = conn.prepareStatement(sql);

        // 1. 給ps的占位符進行賦值
        for (int i = 0; args != null && i < args.length; i++) {
            ps.setObject(i + 1, args[i]);
        }

        // 2. 執行sql, 得到結果集      id  name  age
        ResultSet resultSet = ps.executeQuery();
        ResultSetMetaData metaData = resultSet.getMetaData();
        while (resultSet.next()) {
            T t = tClass.newInstance(); // 使用無參構造函數進行創建對象   new User()

            for (int i = 1; i <= metaData.getColumnCount(); i++) { // 列的索引從1開始
                String columnName = metaData.getColumnLabel(i);  // 獲取列名的別名(如果有)
                Object value = resultSet.getObject(columnName);
                BeanUtils.setProperty(t, columnName, value);
            }
            result.add(t);
        }

        return result;
    }

  2)新建DimUtil

package com.yuange.flinkrealtime.util;

import com.alibaba.fastjson.JSONObject;

import java.lang.reflect.InvocationTargetException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;

/**
 * @作者:袁哥
 * @時間:2021/8/2 21:30
 */
public class DimUtil {
    public static JSONObject readDimFromPhoenix(Connection phoenixConn, String tableName, String id) throws SQLException, IllegalAccessException, InvocationTargetException, InstantiationException {
        // 通過jdbc去Phoenix查詢數據
        String sql = "select * from " + tableName + " where id = ?";

        List<JSONObject> list = JdbcUtil.<JSONObject>queryList(phoenixConn, sql, new Object[]{id}, JSONObject.class);

        return list.size() == 0 ? new JSONObject() : list.get(0);
    }
}

  3)在Constant中新建常量

public static final String DIM_USER_INFO = "DIM_USER_INFO";
    public static final String DIM_BASE_PROVINCE = "DIM_BASE_PROVINCE";
    public static final String DIM_SKU_INFO = "DIM_SKU_INFO";
    public static final String DIM_SPU_INFO = "DIM_SPU_INFO";
    public static final String DIM_BASE_TRADEMARK = "DIM_BASE_TRADEMARK";
    public static final String DIM_BASE_CATEGORY3 = "DIM_BASE_CATEGORY3";

4.3.3 join維度表代碼清單

  1)代碼如下

package com.yuange.flinkrealtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV2;
import com.yuange.flinkrealtime.bean.OrderDetail;
import com.yuange.flinkrealtime.bean.OrderInfo;
import com.yuange.flinkrealtime.bean.OrderWide;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.util.DimUtil;
import com.yuange.flinkrealtime.util.JdbcUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.time.Duration;
import java.util.Map;

/**
 * @作者:袁哥
 * @時間:2021/8/2 21:18
 */
public class DwmOrderWideApp extends BaseAppV2 {

    public static void main(String[] args) {
        new DwmOrderWideApp().init(
                3003,
                1,
                "DwmOrderWideApp",
                "DwmOrderWideApp",
                Constant.TOPIC_DWD_ORDER_INFO, Constant.TOPIC_DWD_ORDER_DETAIL
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
        // 1. 事實表進行join
        SingleOutputStreamOperator<OrderWide> orderWideSingleOutputStreamOperator = factJoin(streams);
        // 2. join的維度數據
        SingleOutputStreamOperator<OrderWide> orderWideStreamWithDim = dimJoin(orderWideSingleOutputStreamOperator);
        orderWideStreamWithDim.print();
        // 3. 把寬表寫入到dwm層(kafka)
    }

    private SingleOutputStreamOperator<OrderWide> dimJoin(SingleOutputStreamOperator<OrderWide> orderWideSingleOutputStreamOperator) {
        /*join維度:
        每一個OrderWide都去hbase里查詢相應的維度*/
        return orderWideSingleOutputStreamOperator.map(new RichMapFunction<OrderWide, OrderWide>() {
            private Connection phoenixConn;

            @Override
            public void open(Configuration parameters) throws Exception {
                phoenixConn = JdbcUtil.getPhoenixConnection();
            }

            @Override
            public OrderWide map(OrderWide wide) throws Exception {
                // 補充 dim_user_info  select * from t where id=?
                JSONObject userInfo = DimUtil.readDimFromPhoenix(phoenixConn, Constant.DIM_USER_INFO, wide.getUser_id().toString());
                wide.setUser_gender(userInfo.getString("GENDER"));
                wide.setUser_age(userInfo.getString("BIRTHDAY"));

                // 2. 省份
                JSONObject baseProvince = DimUtil.readDimFromPhoenix(phoenixConn, Constant.DIM_BASE_PROVINCE, wide.getProvince_id().toString());
                wide.setProvince_3166_2_code(baseProvince.getString("ISO_3166_2"));
                wide.setProvince_area_code(baseProvince.getString("AREA_CODE"));
                wide.setProvince_iso_code(baseProvince.getString("ISO_CODE"));
                wide.setProvince_name(baseProvince.getString("NAME"));

                // 3. sku
                JSONObject skuInfo = DimUtil.readDimFromPhoenix(phoenixConn, Constant.DIM_SKU_INFO, wide.getSku_id().toString());
                wide.setSku_name(skuInfo.getString("SKU_NAME"));

                wide.setSpu_id(skuInfo.getLong("SPU_ID"));
                wide.setTm_id(skuInfo.getLong("TM_ID"));
                wide.setCategory3_id(skuInfo.getLong("CATEGORY3_ID"));

                // 4. spu
                JSONObject spuInfo = DimUtil.readDimFromPhoenix(phoenixConn, Constant.DIM_SPU_INFO, wide.getSpu_id().toString());
                wide.setSpu_name(spuInfo.getString("SPU_NAME"));
                // 5. tm
                JSONObject tmInfo = DimUtil.readDimFromPhoenix(phoenixConn, Constant.DIM_BASE_TRADEMARK, wide.getTm_id().toString());
                wide.setTm_name(tmInfo.getString("TM_NAME"));

                // 5. c3
                JSONObject c3Info = DimUtil.readDimFromPhoenix(phoenixConn, Constant.DIM_BASE_CATEGORY3, wide.getCategory3_id().toString());
                wide.setCategory3_name(c3Info.getString("NAME"));
                return wide;
            }
        });
    }

    private SingleOutputStreamOperator<OrderWide> factJoin(Map<String, DataStreamSource<String>> streams) {
        KeyedStream<OrderInfo, Long> orderInfoStream = streams.get(Constant.TOPIC_DWD_ORDER_INFO)
                .map(info -> JSON.parseObject(info, OrderInfo.class))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((info, ts) -> info.getCreate_ts())
                )
                .keyBy(OrderInfo::getId);

        KeyedStream<OrderDetail, Long> orderDetailStream = streams.get(Constant.TOPIC_DWD_ORDER_DETAIL)
                .map(info -> JSON.parseObject(info, OrderDetail.class))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((detail, ts) -> detail.getCreate_ts())
                )
                .keyBy(OrderDetail::getOrder_id);

        return orderInfoStream.intervalJoin(orderDetailStream)
                .between(Time.seconds(-5),Time.seconds(5))
                .process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
                    @Override
                    public void processElement(OrderInfo left,
                                               OrderDetail right,
                                               Context ctx,
                                               Collector<OrderWide> out) throws Exception {
                        out.collect(new OrderWide(left, right));
                    }
                });
    }
}

  2)在IDEA中啟動程序,然后生產業務數據,查看控制台是否能打印數據

cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar

4.3.4 維度管理性能優化

  優化1: 加入旁路緩存模式 cache-aside-pattern)

    我們在上面實現的功能中,直接查詢的Hbase。外部數據源的查詢常常是流式計算的性能瓶頸,所以我們需要在上面實現的基礎上進行一定的優化。我們這里使用旁路緩存旁路緩存模式是一種非常常見的按需分配緩存的模式。如下圖,任何請求優先訪問緩存,緩存命中,直接獲得數據返回請求。如果未命中則,查詢數據庫,同時把結果寫入緩存以備后續請求使用。

    這種緩存策略有幾個注意點:

      1)緩存要設過期時間,不然冷數據會常駐緩存浪費資源。

      2)要考慮維度數據是否會發生變化,如果發生變化要主動更新緩存。

    緩存的選型:堆緩存(應用程序內存)或者獨立緩存服務(redis,memcache)。

      堆緩存,從性能角度看更好,畢竟訪問數據路徑更短,減少過程消耗。但是管理性差,其他進程無法維護緩存中的數據, 也會比較消耗內存。

      獨立緩存服務(redis,memcache)本事性能也不錯,不過會有創建連接、網絡IO等消耗。但是考慮到數據如果會發生變化,那還是獨立緩存服務管理性更強,而且如果數據量特別大,獨立緩存更容易擴展。

      因為咱們的維度數據都是可變數據,所以這里還是采用Redis管理緩存

    具體實現代碼

      1)導入Jedis依賴

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.2.0</version>
</dependency>

      2)封裝Jedis工具類

package com.yuange.flinkrealtime.util;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * @作者:袁哥
 * @時間:2021/8/3 8:35
 */
public class RedisUtil {
    static JedisPool pool;

    static {
        JedisPoolConfig conf = new JedisPoolConfig();
        conf.setMaxTotal(300);  //線程池中線程的最大數量
        conf.setMaxIdle(10);
        conf.setMaxWaitMillis(10000);   //等待時間
        conf.setMinIdle(4);
        conf.setTestOnCreate(true);
        conf.setTestOnBorrow(true);
        conf.setTestOnReturn(true);

        pool = new JedisPool(conf,"hadoop162",6379);
    }

    public static Jedis getRedisClient(){
        Jedis resource = pool.getResource();
        resource.select(1); //選擇1號庫
        return resource;
    }
}

    Redis中數據結構的選擇:

      在redis使用什么數據結構? 考慮方便讀取和保存, 並且能夠單獨給每條數據設置過期時間,如果不設置過期時間, 一些冷數據會比較消耗內存.  綜合考慮之后選擇使用: string

key

value

"dim_" + table + "_" + id

維度信息的json字符串

    join帶緩存維度代碼清單,為了方便使用, 需要重構前面的readDim代碼

      (1)在DimUtil中添加如下內容:

public static JSONObject readDim(Connection phoenixConn, Jedis client, String tableName, String id) throws InvocationTargetException, SQLException, InstantiationException, IllegalAccessException {
        //先從redis讀取
        JSONObject jsonObject = readDimFromRedis(client, tableName, id);
        if (jsonObject != null){
            return jsonObject;
        }else {
            jsonObject = readDimFromPhoenix(phoenixConn,tableName,id);
            //寫入到redis
            writeDimToRedis(client,tableName,id,jsonObject);
       return jsonObject;  }
} private static void writeDimToRedis(Jedis client, String tableName, String id, JSONObject jsonObject) { String key = getRedisDimKey(tableName, id); String value = jsonObject.toJSONString(); client.setex(key, 24 * 60 * 60,value); } private static JSONObject readDimFromRedis(Jedis client, String tableName, String id) { String key = getRedisDimKey(tableName, id);//拼接key String value = client.get(key); if (value != null){ // 每個key如果讀取到一次之后, 應該把過期重新設置24小時,保持熱點數據不過時 client.expire(key, 24 * 60 * 60); return JSON.parseObject(value); } return null; } private static String getRedisDimKey(String tableName, String id) { return tableName + ":" + id; }

      (2)在PhoenixSink中添加對redis的更新,當Hbase中的維度信息發生變化后,也要對redis進行更新

package com.yuange.flinkrealtime.sink;

import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.bean.TableProcess;
import com.yuange.flinkrealtime.util.JdbcUtil;
import com.yuange.flinkrealtime.util.RedisUtil;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Map;

/**
 * @作者:袁哥
 * @時間:2021/7/30 23:25
 */
public class PhoenixSink extends RichSinkFunction<Tuple2<JSONObject, TableProcess>> {
    Connection conn;
    ValueState<String> tableCreateState;
    private Jedis client;
    @Override
    public void open(Configuration parameters) throws Exception {
        //先加載驅動, 很多情況不是必須.
        //大部分常用的數據庫會根據url自動選擇合適的driver
        //Phoenix 驅動有些時候需要手動加載一下
        conn = JdbcUtil.getPhoenixConnection();
        //創建一個狀態來管理table
        tableCreateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("tableCreateState", String.class));
        client = RedisUtil.getRedisClient();
        client.select(1);
    }

    @Override
    public void invoke(Tuple2<JSONObject, TableProcess> value, Context context) throws Exception {
        // 1. 檢測表, 如果表不存在就需要在Phoenix中新建表
        checkTable(value);
        // 2. 再把數據寫入到phoenix中
        writeToPhoenix(value);
        // 3. 更新redis緩存  (讀取維度數據緩存優化的時候, 添加這個代碼)
        // 對緩存中已經存在的, 並且這次又更新的維度, 去更新緩沖中的維度. 新增的維度千萬不要去寫入到緩存
        // 粗暴: 直接把緩存刪除
        updateCache(value);
    }

    private void updateCache(Tuple2<JSONObject, TableProcess> value) {
        // {"id": 1} => {"ID": 1}

        // 把json數據的key全部大寫
        JSONObject data = new JSONObject();
        for (Map.Entry<String, Object> entry : value.f0.entrySet()) {
            data.put(entry.getKey().toUpperCase(), entry.getValue());
        }

        // 更新redis緩存
        // 這次是update, 並且redis中還存在
        String operateType = value.f1.getOperate_type();
        String key = value.f1.getSink_table().toUpperCase() + ":" + data.get("ID");

        String dim = client.get(key);
        if ("update".equals(operateType) && dim != null) {
            // 更新
            client.setex(key, 24 * 60 * 60, data.toJSONString());
        }
    }

    private void writeToPhoenix(Tuple2<JSONObject, TableProcess> value) throws SQLException {
        JSONObject data = value.f0;
        TableProcess tp = value.f1;

        // upsert  into user(id, name, age) values(?,?,?)
        //拼接SQL語句
        StringBuilder insertSql = new StringBuilder();
        insertSql
                .append("upsert into ")
                .append(tp.getSink_table())
                .append("(")
                //id,activity_name,activity_type,activity_desc,start_time,end_time,create_time
                .append(tp.getSink_columns())
                .append(")values(")
                //把非,部分替換為?
                .append(tp.getSink_columns().replaceAll("[^,]+","?"))
                .append(")");
        PreparedStatement ps = conn.prepareStatement(insertSql.toString());
        //給ps中的占位符賦值
        String[] columnNames = tp.getSink_columns().split(",");
        for (int i = 0; i < columnNames.length; i++) {
            //從JSONObject數據中取出對應字段的值
            Object str = data.getString(columnNames[i]);
            ps.setString(i + 1,str == null ? "" : str.toString());
        }

        ps.execute();
        conn.commit();
        ps.close();
    }

    private void checkTable(Tuple2<JSONObject, TableProcess> value) throws IOException, SQLException {
        if (tableCreateState.value() == null){
            // 執行sql語句   create table if not exists user(id varchar, age varchar )
            TableProcess tp = value.f1;
            // 拼接sql語句
            StringBuilder createSql = new StringBuilder();
            createSql
                    .append("create table if not exists ")
                    .append(tp.getSink_table())
                    .append("(")
                    .append(tp.getSink_columns().replaceAll(","," varchar,"))
                    .append(" varchar, constraint pk primary key(")
                    .append(tp.getSink_pk() == null ? "id" : tp.getSink_pk())
                    .append("))")
                    .append(tp.getSink_extend() == null ? "" : tp.getSink_extend());

            PreparedStatement ps = conn.prepareStatement(createSql.toString());
            ps.execute();
            conn.commit();
            ps.close();
            //更新狀態
            tableCreateState.update(tp.getSink_table());
        }
    }
}

      維度信息變化后緩存的處理,維度信息發生變化后, 如果緩存中存在需要變化的數據, 應該立即刪除緩存數據或者更新緩存數據,新建DwmOrderWideApp_Cache類

package com.yuange.flinkrealtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV2;
import com.yuange.flinkrealtime.bean.OrderDetail;
import com.yuange.flinkrealtime.bean.OrderInfo;
import com.yuange.flinkrealtime.bean.OrderWide;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.util.DimUtil;
import com.yuange.flinkrealtime.util.JdbcUtil;
import com.yuange.flinkrealtime.util.RedisUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;

import java.sql.Connection;
import java.time.Duration;
import java.util.Map;

/**
 * @作者:袁哥
 * @時間:2021/8/2 21:18
 */
public class DwmOrderWideApp_Cache extends BaseAppV2 {

    public static void main(String[] args) {
        new DwmOrderWideApp_Cache().init(
                3003,
                1,
                "DwmOrderWideApp",
                "DwmOrderWideApp",
                Constant.TOPIC_DWD_ORDER_INFO, Constant.TOPIC_DWD_ORDER_DETAIL
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
        // 1. 事實表進行join
        SingleOutputStreamOperator<OrderWide> orderWideSingleOutputStreamOperator = factJoin(streams);
        // 2. join的維度數據
        SingleOutputStreamOperator<OrderWide> orderWideStreamWithDim = dimJoin(orderWideSingleOutputStreamOperator);
        orderWideStreamWithDim.print();
        // 3. 把寬表寫入到dwm層(kafka)
    }

    private SingleOutputStreamOperator<OrderWide> dimJoin(SingleOutputStreamOperator<OrderWide> orderWideSingleOutputStreamOperator) {
        /*join維度:
        每一個OrderWide都去hbase里查詢相應的維度*/
        return orderWideSingleOutputStreamOperator.map(new RichMapFunction<OrderWide, OrderWide>() {
            private Connection phoenixConn;
            Jedis redisClient;
            @Override
            public void open(Configuration parameters) throws Exception {
                phoenixConn = JdbcUtil.getPhoenixConnection();
                redisClient = RedisUtil.getRedisClient();
            }

            @Override
            public OrderWide map(OrderWide wide) throws Exception {
                // 補充 dim_user_info  select * from t where id=?
                JSONObject userInfo = DimUtil.readDim(phoenixConn,redisClient, Constant.DIM_USER_INFO, wide.getUser_id().toString());
                wide.setUser_gender(userInfo.getString("GENDER"));
                wide.setUser_age(userInfo.getString("BIRTHDAY"));

                // 2. 省份
                JSONObject baseProvince = DimUtil.readDim(phoenixConn,redisClient, Constant.DIM_BASE_PROVINCE, wide.getProvince_id().toString());
                wide.setProvince_3166_2_code(baseProvince.getString("ISO_3166_2"));
                wide.setProvince_area_code(baseProvince.getString("AREA_CODE"));
                wide.setProvince_iso_code(baseProvince.getString("ISO_CODE"));
                wide.setProvince_name(baseProvince.getString("NAME"));

                // 3. sku
                JSONObject skuInfo = DimUtil.readDim(phoenixConn,redisClient, Constant.DIM_SKU_INFO, wide.getSku_id().toString());
                wide.setSku_name(skuInfo.getString("SKU_NAME"));

                wide.setSpu_id(skuInfo.getLong("SPU_ID"));
                wide.setTm_id(skuInfo.getLong("TM_ID"));
                wide.setCategory3_id(skuInfo.getLong("CATEGORY3_ID"));

                // 4. spu
                JSONObject spuInfo = DimUtil.readDim(phoenixConn,redisClient, Constant.DIM_SPU_INFO, wide.getSpu_id().toString());
                wide.setSpu_name(spuInfo.getString("SPU_NAME"));
                // 5. tm
                JSONObject tmInfo = DimUtil.readDim(phoenixConn,redisClient, Constant.DIM_BASE_TRADEMARK, wide.getTm_id().toString());
                wide.setTm_name(tmInfo.getString("TM_NAME"));

                // 5. c3
                JSONObject c3Info = DimUtil.readDim(phoenixConn,redisClient, Constant.DIM_BASE_CATEGORY3, wide.getCategory3_id().toString());
                wide.setCategory3_name(c3Info.getString("NAME"));
                return wide;
            }

            @Override
            public void close() throws Exception {
                if (redisClient != null){
                    redisClient.close();
                }
                if (phoenixConn != null){
                    phoenixConn.close();
                }
            }
        });

    }

    private SingleOutputStreamOperator<OrderWide> factJoin(Map<String, DataStreamSource<String>> streams) {
        KeyedStream<OrderInfo, Long> orderInfoStream = streams.get(Constant.TOPIC_DWD_ORDER_INFO)
                .map(info -> JSON.parseObject(info, OrderInfo.class))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((info, ts) -> info.getCreate_ts())
                )
                .keyBy(OrderInfo::getId);

        KeyedStream<OrderDetail, Long> orderDetailStream = streams.get(Constant.TOPIC_DWD_ORDER_DETAIL)
                .map(info -> JSON.parseObject(info, OrderDetail.class))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((detail, ts) -> detail.getCreate_ts())
                )
                .keyBy(OrderDetail::getOrder_id);

        return orderInfoStream.intervalJoin(orderDetailStream)
                .between(Time.seconds(-5),Time.seconds(5))
                .process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
                    @Override
                    public void processElement(OrderInfo left,
                                               OrderDetail right,
                                               Context ctx,
                                               Collector<OrderWide> out) throws Exception {
                        out.collect(new OrderWide(left, right));
                    }
                });
    }
}

    測試:

      (1)啟動Hadoop、Zookeeper、Kafka、maxwell、HBase、Flink的yarn-session

      (2)編寫Redis啟動腳本,並賦予可執行權限

vim /home/atguigu/bin/redis.sh
#!/bin/bash
echo "在hadoop162上啟動redis服務器"
/usr/local/bin/redis-server /opt/module/redis-3.2.12/redis.conf
chmod +x /home/atguigu/bin/redis.sh

      (3)運行腳本,啟動redis

redis.sh

      (4)使用Maven打包,將Jar包上傳至/opt/module/applog

      (5)修改realtime.sh

vim /home/atguigu/bin/realtime.sh
#!/bin/bash
flink=/opt/module/flink-yarn/bin/flink
jar=/opt/module/applog/flink-realtime-1.0-SNAPSHOT.jar

apps=(
        #com.yuange.flinkrealtime.app.dwd.DwdLogApp
        com.yuange.flinkrealtime.app.dwd.DwdDbApp
        #com.yuange.flinkrealtime.app.dwm.DwmUvApp
        #com.yuange.flinkrealtime.app.dwm.DwmJumpDetailApp_Two
        com.yuange.flinkrealtime.app.dwm.DwmOrderWideApp_Cache
)

for app in ${apps[*]} ; do
        $flink run -d -c $app $jar
done

      (6)運行realtime.sh腳本,將程序提交至yarn-session

realtime.sh

      (7)模擬生產日志,進行測試

cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar

      (8)啟動redis客戶端,查看1號庫是否有數據

redis-cli --raw
#選擇1號庫
select 1

  優化2: 異步查詢

    在Flink 流處理過程中,經常需要和外部系統進行交互,用維度表補全事實表中的字段。例如:在電商場景中,需要一個商品的skuid去關聯商品的一些屬性,例如商品所屬行業、商品的生產廠家、生產廠家的一些情況;在物流場景中,知道包裹id,需要去關聯包裹的行業屬性、發貨信息、收貨信息等等。

    默認情況下,在Flink的MapFunction中,單個並行只能用同步方式去交互:將請求發送到外部存儲,IO阻塞,等待請求返回,然后繼續發送下一個請求。這種同步交互的方式往往在網絡等待上就耗費了大量時間。為了提高處理效率,可以增加MapFunction的並行度,但增加並行度就意味着更多的資源,並不是一種非常好的解決方式。

    Flink 在1.2中引入了Async I/O,在異步模式下,將IO操作異步化,單個並行可以連續發送多個請求,哪個請求先返回就先處理,從而在連續的請求間不需要阻塞式等待,大大提高了流處理效率。

    Async I/O 是阿里巴巴貢獻給社區的一個呼聲非常高的特性,解決與外部系統交互時網絡延遲成為了系統瓶頸的問題。

    異步查詢實際上是把維表的查詢操作托管給單獨的線程池完成,這樣不會因為某一個查詢造成阻塞,單個並行可以連續發送多個請求,提高並發效率。這種方式特別針對涉及網絡IO的操作,減少因為請求等待帶來的消耗。

    使用異步API的先決條件:正確地實現數據庫(或鍵/值存儲)的異步 I/O 交互需要支持異步請求的數據庫客戶端。許多主流數據庫都提供了這樣的客戶端。如果沒有這樣的客戶端,可以通過創建多個客戶端並使用線程池處理同步調用的方法,將同步客戶端轉換為有限並發的客戶端。然而,這種方法通常比正規的異步客戶端效率低。Phoenix目前沒有提供異步的客戶端, 所以只能通過創建多個客戶端並使用線程池處理同步調用的方法,將同步客戶端轉換為有限並發的客戶端

    Flink的異步I/O API:Flink 的異步 I/O API 允許用戶在流處理中使用異步請求客戶端。API 處理與數據流的集成,同時還能處理好順序、事件時間和容錯等。在具備異步數據庫客戶端的基礎上,實現數據流轉換操作與數據庫的異步 I/O 交互需要以下三部分:

      1)實現分發請求的 AsyncFunction

      2)獲取數據庫交互的結果並發送給 ResultFuture 的 回調函數

      3)將異步 I/O 操作應用於 DataStream 作為 DataStream 的一次轉換操作。

    異步API的實現代碼:

      1)在Constant中添加常量

public static final String TOPIC_DWM_ORDER_WIDE = "dwm_order_wide";

      2)新建ThreadUtil,獲取線程池

package com.yuange.flinkrealtime.util;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @作者:袁哥
 * @時間:2021/8/3 16:44
 */
public class ThreadUtil {

    public static ThreadPoolExecutor getThreadPool(){
        return new ThreadPoolExecutor(
                100,    //核心線程數
                300,    //最大上限
                1,      //保留時間
                TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(100)  //超過上限之后的線程存儲到這個隊列中
        );
    }
}

      3)新建DimAsyncFunction,讓其繼承異步RichAsyncFunction類,並將參數設置為泛型

package com.yuange.flinkrealtime.function;

import com.yuange.flinkrealtime.util.JdbcUtil;
import com.yuange.flinkrealtime.util.RedisUtil;
import com.yuange.flinkrealtime.util.ThreadUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import redis.clients.jedis.Jedis;

import java.sql.Connection;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @作者:袁哥
 * @時間:2021/8/3 16:43
 */
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T,T> {
    ThreadPoolExecutor threadPool;
    Connection phoenixConnection;
    @Override
    public void open(Configuration parameters) throws Exception {
        threadPool = ThreadUtil.getThreadPool();
        phoenixConnection = JdbcUtil.getPhoenixConnection();
    }

    @Override
    public void close() throws Exception {
        if (phoenixConnection != null) {
            phoenixConnection.close();
        }

        if (threadPool != null) {
            threadPool.shutdown();
        }
    }

    @Override
    public void timeout(Object input, ResultFuture resultFuture) throws Exception {
        // 超時的時候回調這個方法
        System.out.println("超時: " + input);
    }

    public abstract void addDim(Connection phoenixConn,
                                Jedis client,
                                T input,
                                ResultFuture<T> resultFuture) throws Exception;

    @Override
    public void asyncInvoke(T input,
                            ResultFuture<T> resultFuture) throws Exception {
        // 客戶端需要有支持異步的api, 如果沒有, 則可以使用多線程來完成
        threadPool.submit(() -> {
            // 讀取維度操作放在這里就可以了
            // redis在異步使用的時候, 必須每個操作單獨得到一個客戶端
            Jedis client = RedisUtil.getRedisClient();
            // 和具體的業務相關
            try {
                addDim(phoenixConnection, client, input, resultFuture);
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException("異步執行的時候的異常, " +
                        "請檢測異步操作: Phoenix是否開啟, redis是否開啟, hadoop是否開啟, maxwell是否開啟 ....");
            }
            client.close();
        });
    }
}

      4)新建DwmOrderWideApp_Cache_Async,使用異步方式處理數據,並將數據寫入Kafka中

package com.yuange.flinkrealtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV2;
import com.yuange.flinkrealtime.bean.OrderDetail;
import com.yuange.flinkrealtime.bean.OrderInfo;
import com.yuange.flinkrealtime.bean.OrderWide;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.function.DimAsyncFunction;
import com.yuange.flinkrealtime.util.DimUtil;
import com.yuange.flinkrealtime.util.FlinkSinkUtil;
import com.yuange.flinkrealtime.util.JdbcUtil;
import com.yuange.flinkrealtime.util.RedisUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;

import java.sql.Connection;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * @作者:袁哥
 * @時間:2021/8/2 21:18
 */
public class DwmOrderWideApp_Cache_Async extends BaseAppV2 {

    public static void main(String[] args) {
        new DwmOrderWideApp_Cache_Async().init(
                3003,
                1,
                "DwmOrderWideApp",
                "DwmOrderWideApp",
                Constant.TOPIC_DWD_ORDER_INFO, Constant.TOPIC_DWD_ORDER_DETAIL
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
        // 1. 事實表進行join
        SingleOutputStreamOperator<OrderWide> orderWideSingleOutputStreamOperator = factJoin(streams);
        // 2. join的維度數據
        SingleOutputStreamOperator<OrderWide> orderWideStreamWithDim = dimJoin(orderWideSingleOutputStreamOperator);
        orderWideStreamWithDim.print();
        // 3. 把寬表寫入到dwm層(kafka)
        sendToKafka(orderWideStreamWithDim);
    }

    private void sendToKafka(SingleOutputStreamOperator<OrderWide> stream) {
        stream.map(JSON::toJSONString)  //將stream轉化為JSON
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWM_ORDER_WIDE));
    }

    private SingleOutputStreamOperator<OrderWide> dimJoin(SingleOutputStreamOperator<OrderWide> orderWideSingleOutputStreamOperator) {
        return AsyncDataStream.unorderedWait(
                orderWideSingleOutputStreamOperator,    //需要進行異步處理的流
                new DimAsyncFunction<OrderWide>() {

                    @Override
                    public void addDim(Connection phoenixConn,
                                       Jedis redisClient,
                                       OrderWide wide,
                                       ResultFuture<OrderWide> resultFuture) throws Exception {
                        // 1. 補充 dim_user_info  select * from t where id=?
                        JSONObject userInfo = DimUtil.readDim(phoenixConn, redisClient, Constant.DIM_USER_INFO, wide.getUser_id().toString());
                        wide.setUser_gender(userInfo.getString("GENDER"));
                        wide.setUser_age(userInfo.getString("BIRTHDAY"));

                        // 2. 省份
                        JSONObject baseProvince = DimUtil.readDim(phoenixConn, redisClient, Constant.DIM_BASE_PROVINCE, wide.getProvince_id().toString());
                        wide.setProvince_3166_2_code(baseProvince.getString("ISO_3166_2"));
                        wide.setProvince_area_code(baseProvince.getString("AREA_CODE"));
                        wide.setProvince_iso_code(baseProvince.getString("ISO_CODE"));
                        wide.setProvince_name(baseProvince.getString("NAME"));

                        // 3. sku
                        JSONObject skuInfo = DimUtil.readDim(phoenixConn, redisClient, Constant.DIM_SKU_INFO, wide.getSku_id().toString());
                        wide.setSku_name(skuInfo.getString("SKU_NAME"));

                        wide.setSpu_id(skuInfo.getLong("SPU_ID"));
                        wide.setTm_id(skuInfo.getLong("TM_ID"));
                        wide.setCategory3_id(skuInfo.getLong("CATEGORY3_ID"));

                        // 4. spu
                        JSONObject spuInfo = DimUtil.readDim(phoenixConn, redisClient, Constant.DIM_SPU_INFO, wide.getSpu_id().toString());
                        wide.setSpu_name(spuInfo.getString("SPU_NAME"));
                        // 5. tm
                        JSONObject tmInfo = DimUtil.readDim(phoenixConn, redisClient, Constant.DIM_BASE_TRADEMARK, wide.getTm_id().toString());
                        wide.setTm_name(tmInfo.getString("TM_NAME"));

                        // 5. c3
                        JSONObject c3Info = DimUtil.readDim(phoenixConn, redisClient, Constant.DIM_BASE_CATEGORY3, wide.getCategory3_id().toString());
                        wide.setCategory3_name(c3Info.getString("NAME"));

                        resultFuture.complete(Collections.singletonList(wide));
                    }
                },  //異步處理函數
                60, //每個異步操作的超時時間
                TimeUnit.SECONDS    //超時時間的代碼
        );
    }

    private SingleOutputStreamOperator<OrderWide> factJoin(Map<String, DataStreamSource<String>> streams) {
        KeyedStream<OrderInfo, Long> orderInfoStream = streams.get(Constant.TOPIC_DWD_ORDER_INFO)
                .map(info -> JSON.parseObject(info, OrderInfo.class))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((info, ts) -> info.getCreate_ts())
                )
                .keyBy(OrderInfo::getId);

        KeyedStream<OrderDetail, Long> orderDetailStream = streams.get(Constant.TOPIC_DWD_ORDER_DETAIL)
                .map(info -> JSON.parseObject(info, OrderDetail.class))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((detail, ts) -> detail.getCreate_ts())
                )
                .keyBy(OrderDetail::getOrder_id);

        return orderInfoStream.intervalJoin(orderDetailStream)
                .between(Time.seconds(-5),Time.seconds(5))
                .process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
                    @Override
                    public void processElement(OrderInfo left,
                                               OrderDetail right,
                                               Context ctx,
                                               Collector<OrderWide> out) throws Exception {
                        out.collect(new OrderWide(left, right));
                    }
                });
    }
}

      5)將flink-realtime打包並上傳至/opt/module/applog

      6)修改realtime.sh啟動腳本

vim /home/atguigu/bin/realtime.sh
#!/bin/bash
flink=/opt/module/flink-yarn/bin/flink
jar=/opt/module/applog/flink-realtime-1.0-SNAPSHOT.jar

apps=(
        com.yuange.flinkrealtime.app.dwd.DwdLogApp
        com.yuange.flinkrealtime.app.dwd.DwdDbApp
        com.yuange.flinkrealtime.app.dwm.DwmUvApp
        #com.yuange.flinkrealtime.app.dwm.DwmJumpDetailApp_Two
        #com.yuange.flinkrealtime.app.dwm.DwmOrderWideApp_Cache
        com.yuange.flinkrealtime.app.dwm.DwmOrderWideApp_Cache_Async
)

for app in ${apps[*]} ; do
        $flink run -d -c $app $jar
done

      7)啟動yarn-session

/opt/module/flink-yarn/bin/yarn-session.sh -d

      8)清空redis中的1號庫

flushdb

      9)運行realtime.sh腳本,將程序提交至yarn-session上運行

realtime.sh

      10)啟動一個消費者,測試數據是否到達kafka

consume dwm_order_wide

      11)生產業務數據,模擬新增

cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar

      12)查看redis是否有數據

      13)查看消費情況

第5章 DWM層: 支付寬表

5.1 需求分析與思路

  支付寬表的目的,最主要的原因是支付表沒有到訂單明細,支付金額沒有細分到商品上,沒有辦法統計商品級別的支付狀況。所以本次寬表的核心就是要把支付表的信息與訂單明細關聯上。

  解決方案有三個

    1)一個是把訂單明細表(或者寬表)輸出到Hbase上,在支付寬表計算時查詢hbase,這相當於把訂單明細作為一種維度進行管理。

    2)一個是用流的方式接收訂單明細,然后用雙流join方式進行合並。因為訂單與支付產生有一定的時差。所以必須用intervalJoin來管理流的狀態時間,保證當支付到達時訂單明細還保存在狀態中。

    3)使用流的方式讓支付表和訂單寬表進行join, 就省去了查詢維度表的步驟

5.2 具體實現代碼

5.2.1 用到POJO

  1)支付實體類

package com.yuange.flinkrealtime.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;

/**
 * @作者:袁哥
 * @時間:2021/8/3 18:28
 */
@NoArgsConstructor
@AllArgsConstructor
@Data
public class PaymentInfo {
    private Long id;
    private Long order_id;
    private Long user_id;
    private BigDecimal total_amount;
    private String subject;
    private String payment_type;
    private String create_time;
    private String callback_time;
}

  2)支付寬表實體類

package com.yuange.flinkrealtime.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.beanutils.BeanUtils;

import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;

/**
 * @作者:袁哥
 * @時間:2021/8/3 18:29
 */
@NoArgsConstructor
@AllArgsConstructor
@Data
public class PaymentWide {
    private Long payment_id;
    private String subject;
    private String payment_type;
    private String payment_create_time;
    private String callback_time;
    private Long detail_id;
    private Long order_id;
    private Long sku_id;
    private BigDecimal order_price;
    private Long sku_num;
    private String sku_name;
    private Long province_id;
    private String order_status;
    private Long user_id;
    private BigDecimal total_amount;
    private BigDecimal activity_reduce_amount;
    private BigDecimal coupon_reduce_amount;
    private BigDecimal original_total_amount;
    private BigDecimal feight_fee;
    private BigDecimal split_feight_fee;
    private BigDecimal split_activity_amount;
    private BigDecimal split_coupon_amount;
    private BigDecimal split_total_amount;
    private String order_create_time;

    private String province_name;//查詢維表得到
    private String province_area_code;
    private String province_iso_code;
    private String province_3166_2_code;
    private Integer user_age;
    private String user_gender;

    private Long spu_id;     //作為維度數據 要關聯進來
    private Long tm_id;
    private Long category3_id;
    private String spu_name;
    private String tm_name;
    private String category3_name;

    public PaymentWide(PaymentInfo paymentInfo, OrderWide orderWide) {
        mergeOrderWide(orderWide);
        mergePaymentInfo(paymentInfo);

    }

    public void mergePaymentInfo(PaymentInfo paymentInfo) {
        if (paymentInfo != null) {
            try {
                BeanUtils.copyProperties(this, paymentInfo);
                payment_create_time = paymentInfo.getCreate_time();
                payment_id = paymentInfo.getId();
            } catch (IllegalAccessException | InvocationTargetException e) {
                e.printStackTrace();
            }
        }
    }

    public void mergeOrderWide(OrderWide orderWide) {
        if (orderWide != null) {
            try {
                BeanUtils.copyProperties(this, orderWide);
                order_create_time = orderWide.getCreate_time();
            } catch (IllegalAccessException | InvocationTargetException e) {
                e.printStackTrace();
            }
        }
    }
}

5.2.2 join代碼

  1)在Constant中添加常量

public static final String TOPIC_DWD_PAYMENT_INFO = "dwd_payment_info";
public static final String TOPIC_DWM_PAYMENT_WIDE = "dwm_payment_wide";

  2)在YuangeCommonUtil工具類中添加toTs()方法,將一個String類型的時間轉為Long類型的時間戳

public static long toTs(String create_time) {
        try {
            return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(create_time).getTime();
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return 0L;
    }

  3)新建DwmPaymentWideApp類

package com.yuange.flinkrealtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.yuange.flinkrealtime.app.BaseAppV2;
import com.yuange.flinkrealtime.bean.OrderWide;
import com.yuange.flinkrealtime.bean.PaymentInfo;
import com.yuange.flinkrealtime.bean.PaymentWide;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.util.FlinkSinkUtil;
import com.yuange.flinkrealtime.util.YuangeCommonUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Map;

/**
 * @作者:袁哥
 * @時間:2021/8/3 18:31
 */
public class DwmPaymentWideApp extends BaseAppV2 {

    public static void main(String[] args) {
        new DwmPaymentWideApp().init(
                3004,
                1,
                "DwmPaymentWideApp",
                "DwmPaymentWideApp",
                Constant.TOPIC_DWD_PAYMENT_INFO, Constant.TOPIC_DWM_ORDER_WIDE
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
        KeyedStream<PaymentInfo, Long> paymentInfoStream = streams.get(Constant.TOPIC_DWD_PAYMENT_INFO)
                .map(s -> JSON.parseObject(s, PaymentInfo.class))   //將dwd_payment_info主題中的數據轉為對象
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<PaymentInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, recordTimestamp) -> YuangeCommonUtil.toTs(element.getCreate_time()))
                )
                .keyBy(PaymentInfo::getOrder_id);

        KeyedStream<OrderWide, Long> orderWideStream = streams.get(Constant.TOPIC_DWM_ORDER_WIDE)
                .map(s -> JSON.parseObject(s, OrderWide.class))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<OrderWide>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, recordTimestamp) -> YuangeCommonUtil.toTs(element.getCreate_time()))
                )
                .keyBy(OrderWide::getOrder_id);

        paymentInfoStream.intervalJoin(orderWideStream)
                .between(Time.minutes(-45), Time.seconds(10))
                .process(new ProcessJoinFunction<PaymentInfo, OrderWide, PaymentWide>() {
                    @Override
                    public void processElement(PaymentInfo left,
                                               OrderWide right,
                                               Context ctx,
                                               Collector<PaymentWide> out) throws Exception {
                        out.collect(new PaymentWide(left,right));
                    }
                })
                .map(t->JSON.toJSONString(t))
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWM_PAYMENT_WIDE));
    }
}

  4)承接前面啟動過的程序,不用停止它們

  5)打包上傳至Linux

  6)提交至yarn-session上運行,查看kafka中是否有dwm_payment_wide主題,以及是否有數據

第6章 總結

  DWM層部分的代碼主要的責任,是通過計算把一種明細轉變為另一種明細以應對后續的統計。學完本階段內容要求掌握

  1)學會利用狀態(state)進行去重操作。(需求:UV計算)

  2)學會利用CEP可以針對一組數據進行篩選判斷。需求:跳出行為計算

  3)學會使用intervalJoin處理流join

  4)學會處理維度關聯,並通過緩存和異步查詢對其進行性能優化。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM