Filnk實時數倉(DWS層)


第1章 DWM層和DWS設計

1.1 設計思路

  DWM(Data WareHouse Middle),一般稱為數據中間層. 該層會在DWD層的基礎上,對數據做輕度的聚合操作,生成一系列的中間表,提升公共指標的復用性,減少重復加工。直觀來講,就是對通用的核心維度進行聚合操作,算出相應的統計指標。

  我們在之前通過分流等手段,把數據分拆成了獨立的kafka topic。那么接下來如何處理數據,就要思考一下我們到底要通過實時計算出哪些指標項。

  因為實時計算與離線不同,實時計算的開發和運維成本都是非常高的,要結合實際情況考慮是否有必要象離線數倉一樣,建一個大而全的中間層。如果沒有必要大而全,這時候就需要大體規划一下要實時計算出的指標需求了。把這些指標以主題寬表的形式輸出就是我們的DWS層

1.2 需求梳理

統計主題

需求指標

輸出方式

計算來源

來源層級

訪客

pv

可視化大屏

page_log直接可求

dwd

uv

可視化大屏

需要用page_log過濾去重

dwm

跳出率

可視化大屏

需要通過page_log行為判斷

dwd

連續訪問頁面數

可視化大屏

需要識別開始訪問標識

dwd

連續訪問時長

可視化大屏

需要識別開始訪問標識

dwd

商品

點擊

多維分析

page_log直接可求

dwd

曝光

多維分析

page_log直接可求

dwd

收藏

多維分析

收藏表

dwd

加入購物車

多維分析

購物車表

dwd

下單

可視化大屏

訂單寬表

dwm

支付

多維分析

支付寬表

dwm

退款

多維分析

退款表

dwd

評論

多維分析

評論表

dwd

地區

pv

多維分析

page_log直接可求

dwd

uv

多維分析

需要用page_log過濾去重

dwm

下單

可視化大屏

訂單寬表

dwd

關鍵詞

搜索關鍵詞

可視化大屏

頁面訪問日志 直接可求

dwd

點擊商品關鍵詞

可視化大屏

商品主題下單再次聚合

dws

下單商品關鍵詞

可視化大屏

商品主題下單再次聚合

dws

1.3 DWS層的定位是什么

  1)輕度聚合,因為ADS層要應對很多實時查詢,如果是完全的明細,那么查詢的壓力是非常大的。

  2)將更多的實時數據以主題的方式組合起來便於管理,同時也能減少維度查詢的次數。

第2章 DWS層:訪客主題寬表

訪客

pv

可視化大屏

page_log直接可求

dwd

uv

可視化大屏

需要用page_log過濾去重

dwm

跳出率

可視化大屏

需要通過跳出明細和page_log行為判斷

dwd/dwm

連續訪問頁面數

可視化大屏

需要識別開始訪問標識

dwd

連續訪問時長

可視化大屏

需要識別開始訪問標識

dwd

  設計一張DWS層的表其實就兩件事:維度和度量(事實數據)

    1)度量包括PV、UV、跳出次數、連續訪問頁面數、連續訪問時長

    2)維度包括在分析中比較重要的幾個字段:渠道、地區、版本、新老用戶進行聚合

2.1 需求分析與思路

  1)接收各個明細數據,變為數據流

  2)把數據流合並在一起,成為一個相同格式對象的數據流

  3)對合並的流進行聚合,聚合的時間窗口決定了數據的時效性

  4)把聚合結果寫在數據庫中

2.2 具體實現代碼

2.2.1 定義主題寬表的POJO

package com.yuange.flinkrealtime.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @作者:袁哥
 * @時間:2021/8/4 20:07
 */
@NoArgsConstructor
@AllArgsConstructor
@Data
public class VisitorStats {
    //統計開始時間
    private String stt;
    //統計結束時間
    private String edt;
    //維度:版本
    private String vc;
    //維度:渠道
    private String ch;
    //維度:地區
    private String ar;
    //維度:新老用戶標識
    private String is_new;
    //度量:獨立訪客數
    private Long uv_ct = 0L;
    //度量:頁面訪問數
    private Long pv_ct = 0L;
    //度量: 進入次數
    private Long sv_ct = 0L;
    //度量: 跳出次數
    private Long uj_ct = 0L;
    //度量: 持續訪問時間
    private Long dur_sum = 0L;
    //統計時間
    private Long ts;
}

2.2.2 消費Kafka數據, 4條流合並為一個流 

  1)消費Kafka數據, 解析成pojo, 並把流合並為一個流

package com.yuange.flinkrealtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV2;
import com.yuange.flinkrealtime.bean.VisitorStats;
import com.yuange.flinkrealtime.common.Constant;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Map;

/**
 * @作者:袁哥
 * @時間:2021/8/4 20:17
 */
public class DwsVisitorStatsApp extends BaseAppV2 {

    public static void main(String[] args) {
        new DwsVisitorStatsApp().init(
                4001,
                1,
                "DwsVisitorStatsApp",
                "DwsVisitorStatsApp",
                Constant.TOPIC_DWD_PAGE,Constant.TOPIC_DWM_UV,Constant.TOPIC_DWM_USER_JUMP_DETAIL
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
        // 1. 解析流, 並按照統一的格式union在一起
        DataStream<VisitorStats> visitorStatsDataStream = parseAndUnion(streams);
        visitorStatsDataStream.print();
    }

    private DataStream<VisitorStats> parseAndUnion(Map<String, DataStreamSource<String>> streams) {
        DataStreamSource<String> pageStream = streams.get(Constant.TOPIC_DWD_PAGE);
        DataStreamSource<String> uvStream = streams.get(Constant.TOPIC_DWM_UV);
        DataStreamSource<String> userJumpStream = streams.get(Constant.TOPIC_DWM_USER_JUMP_DETAIL);

        // 1. 計算pv 和持續訪問時長
        SingleOutputStreamOperator<VisitorStats> pvAndDuringTimeStatsStream = pageStream.map(s -> {
            JSONObject jsonObject = JSON.parseObject(s);    //將數據轉化為JSON格式

            JSONObject common = jsonObject.getJSONObject("common"); //取出common數據
            String vc = common.getString("vc");
            String ch = common.getString("ch");
            String ar = common.getString("ar");
            String is_new = common.getString("is_new");

            JSONObject page = jsonObject.getJSONObject("page"); //取出page數據
            Long during_time = page.getLong("during_time");

            Long ts = jsonObject.getLong("ts"); //取出ts字段

            VisitorStats visitorStats = new VisitorStats(
                    "", "",
                    vc, ch, ar, is_new,
                    0L, 1L, 0L, 0L, during_time,
                    ts
            );
            return visitorStats;
        });

        // 2. 計算uv
        SingleOutputStreamOperator<VisitorStats> uvStatsStream = uvStream.map(s -> {
            JSONObject jsonObject = JSON.parseObject(s);    //將數據轉化為JSON格式
            JSONObject common = jsonObject.getJSONObject("common");
            String vc = common.getString("vc");
            String ch = common.getString("ch");
            String ar = common.getString("ar");
            String is_new = common.getString("is_new");

            Long ts = jsonObject.getLong("ts");

            VisitorStats visitorStats = new VisitorStats(
                    "", "",
                    vc, ch, ar, is_new,
                    1L, 0L, 0L, 0L, 0L,
                    ts
            );
            return visitorStats;
        });

        // 3. 計算跳出次數
        SingleOutputStreamOperator<VisitorStats> ujStatsStream = userJumpStream.map(s -> {
            JSONObject jsonObject = JSON.parseObject(s);    //將數據轉化為JSON格式
            JSONObject common = jsonObject.getJSONObject("common");
            String vc = common.getString("vc");
            String ch = common.getString("ch");
            String ar = common.getString("ar");
            String is_new = common.getString("is_new");

            Long ts = jsonObject.getLong("ts");

            VisitorStats visitorStats = new VisitorStats(
                    "", "",
                    vc, ch, ar, is_new,
                    0L, 0L, 0L, 1L, 0L,
                    ts
            );
            return visitorStats;
        });

        // 4. 計算sv進入從哪個數據源?
        SingleOutputStreamOperator<VisitorStats> svStatsStream = pageStream.flatMap(new FlatMapFunction<String, VisitorStats>() {
            @Override
            public void flatMap(String value, Collector<VisitorStats> out) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);    //將數據轉化為JSON格式
                String last_page_id = jsonObject.getJSONObject("page").getString("last_page_id");

                if (last_page_id == null || last_page_id.length() == 0) {    //無跳出行為
                    JSONObject common = jsonObject.getJSONObject("common");
                    String vc = common.getString("vc");
                    String ch = common.getString("ch");
                    String ar = common.getString("ar");
                    String is_new = common.getString("is_new");

                    Long ts = jsonObject.getLong("ts");

                    VisitorStats visitorStats = new VisitorStats(
                            "", "",
                            vc, ch, ar, is_new,
                            0L, 0L, 1L, 0L, 0L,
                            ts
                    );
                    out.collect(visitorStats);
                }
            }
        });
        return pvAndDuringTimeStatsStream.union(uvStatsStream,ujStatsStream,svStatsStream);
    }
}

  2)啟動Hadoop

hadoop.sh start

  3)啟動Zookeeper

zk start

  4)啟動Kafka

kafka.sh start

  5)啟動日志服務器

log-lg.sh start

  6)啟動Flink的yarn-session模式

/opt/module/flink-yarn/bin/yarn-session.sh -d

  7)提交DwdLogApp、DwmUvApp、DwmJumpDetailApp_Two程序至yarn-session上

realtime.sh

  8)在Idea中啟動DwsVisitorStatsApp

  9)生產日志數據

cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

  10)查看控制是否有打印

2.2.3 根據維度進行聚合 

  1)代碼如下

package com.yuange.flinkrealtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV2;
import com.yuange.flinkrealtime.bean.VisitorStats;
import com.yuange.flinkrealtime.common.Constant;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Map;

/**
 * @作者:袁哥
 * @時間:2021/8/4 20:17
 */
public class DwsVisitorStatsApp extends BaseAppV2 {

    public static void main(String[] args) {
        new DwsVisitorStatsApp().init(
                4001,
                1,
                "DwsVisitorStatsApp",
                "DwsVisitorStatsApp",
                Constant.TOPIC_DWD_PAGE,Constant.TOPIC_DWM_UV,Constant.TOPIC_DWM_USER_JUMP_DETAIL
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
        // 1. 解析流, 並按照統一的格式union在一起
        DataStream<VisitorStats> visitorStatsDataStream = parseAndUnion(streams);
//        visitorStatsDataStream.print();
        // 2. 開窗聚合
        SingleOutputStreamOperator<VisitorStats> aggregatedStream = aggWithWindow(visitorStatsDataStream);
        aggregatedStream.print();
    }

    private SingleOutputStreamOperator<VisitorStats> aggWithWindow(DataStream<VisitorStats> visitorStatsDataStream) {
        return visitorStatsDataStream
                .assignTimestampsAndWatermarks(
                    WatermarkStrategy
                            .<VisitorStats>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                            .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                )
                .keyBy(vs -> vs.getVc() + "_" + vs.getCh() + "_" + vs.getAr() + "_" + vs.getIs_new())   //以維度信息作為key
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sideOutputLateData(new OutputTag<VisitorStats>("late"){})
                .reduce(
                        new ReduceFunction<VisitorStats>() {
                            @Override
                            public VisitorStats reduce(VisitorStats value1,
                                                       VisitorStats value2) throws Exception {
                                value1.setUv_ct(value1.getUv_ct() + value2.getUv_ct());
                                value1.setUj_ct(value1.getUj_ct() + value2.getUj_ct());
                                value1.setPv_ct(value1.getPv_ct() + value2.getPv_ct());
                                value1.setSv_ct(value1.getSv_ct() + value2.getSv_ct());
                                value1.setDur_sum(value1.getDur_sum() + value2.getDur_sum());
                                return value1;
                            }
                        },
                        new ProcessWindowFunction<VisitorStats, VisitorStats, String, TimeWindow>() {
                            SimpleDateFormat simpleDateFormat;
                            @Override
                            public void open(Configuration parameters) throws Exception {
                                simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                            }

                            @Override
                            public void process(String s,
                                                Context context,
                                                Iterable<VisitorStats> elements,
                                                Collector<VisitorStats> out) throws Exception {
                                TimeWindow window = context.window();
                                String start = simpleDateFormat.format(window.getStart());
                                String end = simpleDateFormat.format(window.getEnd());

                                VisitorStats visitorStats = elements.iterator().next();
                                visitorStats.setStt(start);
                                visitorStats.setEdt(end);
                                out.collect(visitorStats);
                            }
                        }
                );
    }

    private DataStream<VisitorStats> parseAndUnion(Map<String, DataStreamSource<String>> streams) {
        DataStreamSource<String> pageStream = streams.get(Constant.TOPIC_DWD_PAGE);
        DataStreamSource<String> uvStream = streams.get(Constant.TOPIC_DWM_UV);
        DataStreamSource<String> userJumpStream = streams.get(Constant.TOPIC_DWM_USER_JUMP_DETAIL);

        // 1. 計算pv 和持續訪問時長
        SingleOutputStreamOperator<VisitorStats> pvAndDuringTimeStatsStream = pageStream.map(s -> {
            JSONObject jsonObject = JSON.parseObject(s);    //將數據轉化為JSON格式

            JSONObject common = jsonObject.getJSONObject("common"); //取出common數據
            String vc = common.getString("vc");
            String ch = common.getString("ch");
            String ar = common.getString("ar");
            String is_new = common.getString("is_new");

            JSONObject page = jsonObject.getJSONObject("page"); //取出page數據
            Long during_time = page.getLong("during_time");

            Long ts = jsonObject.getLong("ts"); //取出ts字段

            VisitorStats visitorStats = new VisitorStats(
                    "", "",
                    vc, ch, ar, is_new,
                    0L, 1L, 0L, 0L, during_time,
                    ts
            );
            return visitorStats;
        });

        // 2. 計算uv
        SingleOutputStreamOperator<VisitorStats> uvStatsStream = uvStream.map(s -> {
            JSONObject jsonObject = JSON.parseObject(s);    //將數據轉化為JSON格式
            JSONObject common = jsonObject.getJSONObject("common");
            String vc = common.getString("vc");
            String ch = common.getString("ch");
            String ar = common.getString("ar");
            String is_new = common.getString("is_new");

            Long ts = jsonObject.getLong("ts");

            VisitorStats visitorStats = new VisitorStats(
                    "", "",
                    vc, ch, ar, is_new,
                    1L, 0L, 0L, 0L, 0L,
                    ts
            );
            return visitorStats;
        });

        // 3. 計算跳出次數
        SingleOutputStreamOperator<VisitorStats> ujStatsStream = userJumpStream.map(s -> {
            JSONObject jsonObject = JSON.parseObject(s);    //將數據轉化為JSON格式
            JSONObject common = jsonObject.getJSONObject("common");
            String vc = common.getString("vc");
            String ch = common.getString("ch");
            String ar = common.getString("ar");
            String is_new = common.getString("is_new");

            Long ts = jsonObject.getLong("ts");

            VisitorStats visitorStats = new VisitorStats(
                    "", "",
                    vc, ch, ar, is_new,
                    0L, 0L, 0L, 1L, 0L,
                    ts
            );
            return visitorStats;
        });

        // 4. 計算sv進入從哪個數據源?
        SingleOutputStreamOperator<VisitorStats> svStatsStream = pageStream.flatMap(new FlatMapFunction<String, VisitorStats>() {
            @Override
            public void flatMap(String value, Collector<VisitorStats> out) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);    //將數據轉化為JSON格式
                String last_page_id = jsonObject.getJSONObject("page").getString("last_page_id");

                if (last_page_id == null || last_page_id.length() == 0) {    //無跳出行為
                    JSONObject common = jsonObject.getJSONObject("common");
                    String vc = common.getString("vc");
                    String ch = common.getString("ch");
                    String ar = common.getString("ar");
                    String is_new = common.getString("is_new");

                    Long ts = jsonObject.getLong("ts");

                    VisitorStats visitorStats = new VisitorStats(
                            "", "",
                            vc, ch, ar, is_new,
                            0L, 0L, 1L, 0L, 0L,
                            ts
                    );
                    out.collect(visitorStats);
                }
            }
        });

        return pvAndDuringTimeStatsStream.union(uvStatsStream,ujStatsStream,svStatsStream);
    }
}

  2)在Idea中啟動DwsVisitorStatsApp

  3)生產日志數據

cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

  4)查看控制台

2.2.4 寫入OLAP數據庫

  1)為何要寫入ClickHouse數據庫?

    ClickHouse數據庫作為專門解決大量數據統計分析的數據庫,在保證了海量數據存儲的能力,同時又兼顧了響應速度。而且還支持標准SQL,即靈活又易上手。

  2)在clickhouse中創建表

    (1)啟動clickhouse

sudo systemctl start clickhouse-server

    (2)進入clickhouse客戶端

clickhouse-client -m

    (3)建庫並使用

create database flinkdb;
use flinkdb;

    (4)建表

create table  visitor_stats_2021 (
        stt DateTime,
        edt DateTime,
        vc  String,
        ch  String ,
        ar  String ,
        is_new String ,
        uv_ct UInt64,
        pv_ct UInt64,
        sv_ct UInt64,
        uj_ct UInt64,
        dur_sum  UInt64,
        ts UInt64
        ) engine =ReplacingMergeTree( ts)
        partition by  toYYYYMMDD(stt)
        order by  ( stt,edt,is_new,vc,ch,ar);

  3)加入jdbc-connectorClickHouse依賴包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<!--ClickHouse 依賴開始-->
<!-- https://mvnrepository.com/artifact/ru.yandex.clickhouse/clickhouse-jdbc -->
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.2.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.11.2</version>
</dependency>
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5</version>
</dependency>
<!--ClickHouse 依賴結束-->

  4)在Constant中添加常量

public static final String CLICKHOUSE_DB = "flinkdb";
    public static final String TABLE_VISITOR_STATS = "visitor_stats_2021";
    public static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
    public static final String CLICKHOUSE_URL_PRE = "jdbc:clickhouse://hadoop164:8123";

  5)封裝FlinkSinkUtil工具類,考慮后面多個地方需要向clickhouse寫入數據, 方便使用, 所以對FlinkSinkUtil做封裝

public static <T> SinkFunction<T> getClickHouseSink(String db,
                                                        String table,
                                                        Class<T> tClass) {
        Field[] fields = tClass.getDeclaredFields();

        String clickhouseDriver = Constant.CLICKHOUSE_DRIVER;
        String url = Constant.CLICKHOUSE_URL_PRE + "/" + db;

        StringBuilder sql = new StringBuilder();
        sql.append("insert into ")
                .append(table)
                .append("(");
        //找到字段名
        for (Field field : fields) {
            sql.append(field.getName())
                    .append(",");
        }
        sql.deleteCharAt(sql.length() - 1); //把最后一個逗號刪除
        sql.append(") values(");
        for (Field field : fields) {
            sql.append("?,");
        }
        sql.deleteCharAt(sql.length() - 1);
        sql.append(")");

//        System.out.println(sql.toString());
        //借助jdbc sink封裝一個ClickHouse sink
        return getJdbcSink(
                url,
                clickhouseDriver,
                sql.toString()
        );
    }

    private static <T> SinkFunction<T> getJdbcSink(String url, String clickhouseDriver, String sql) {
        return JdbcSink.sink(
                sql,
                new JdbcStatementBuilder<T>() {
                    @Override
                    public void accept(PreparedStatement ps, T t) throws SQLException {
                        Class<?> aClass = t.getClass();
                        try {
                            Field[] fields = aClass.getDeclaredFields();
                            for (int i = 0; i < fields.length; i++) {
                                Field field = fields[i];
                                field.setAccessible(true);  //反射對象在使用時應禁止 Java 語言訪問檢查
                                Object v = field.get(t);
                                ps.setObject(i + 1,v);
                            }
                        }catch (IllegalAccessException e){
                            e.printStackTrace();
                        }
                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchIntervalMs(100)   //100毫秒處理一次數據
                        .withMaxRetries(3)  //失敗后最大重試次數
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(url)
                        .withDriverName(clickhouseDriver)
                        .build()
        );
    }

  6)寫入到ClickHouse中

@Override
    protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
        // 1. 解析流, 並按照統一的格式union在一起
        DataStream<VisitorStats> visitorStatsDataStream = parseAndUnion(streams);
//        visitorStatsDataStream.print();
        // 2. 開窗聚合
        SingleOutputStreamOperator<VisitorStats> aggregatedStream = aggWithWindow(visitorStatsDataStream);
//        aggregatedStream.print();
        // 3. 寫入到ClickHouse中
        writeToClickHouse(aggregatedStream);
    }

    private void writeToClickHouse(SingleOutputStreamOperator<VisitorStats> aggregatedStream) {
        // 自定義ck sink
        aggregatedStream.addSink(FlinkSinkUtil.getClickHouseSink(Constant.CLICKHOUSE_DB, Constant.TABLE_VISITOR_STATS, VisitorStats.class));
    }

  7)打包上傳至Linux的/opt/module/applog/

  8)在realtime.sh腳本中添加DwsVisitorStatsApp

com.yuange.flinkrealtime.app.dws.DwsVisitorStatsApp

  9)運行腳本,啟動程序

realtime.sh

  10)生產日志數據

cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

  11)查看clickhouse中有沒有數據

clickhouse-client -m --host=hadoop164
use flinkdb;
select count(*) from visitor_stats_2021;

第3章 DWS層:商品主題寬表

統計主題

需求指標

輸出方式

計算來源

來源層級

商品

點擊

多維分析

dwd_page_log直接可求

dwd

曝光

多維分析

dwd_display_log直接可求

dwd

收藏

多維分析

收藏表

dwd

加入購物車

多維分析

購物車表

dwd

下單

可視化大屏

訂單寬表

dwm

支付

多維分析

支付寬表

dwm

退款

多維分析

退款表

dwd

評價

多維分析

評價表

dwd

  與訪客的dws層的寬表類似,也是把多個事實表的明細數據匯總起來組合成寬表

3.1 需求分析與思路

  1)從Kafka主題中獲得數據流

  2)Json字符串數據流轉換為統一數據對象的數據流

  3)把統一的數據結構流合並為一個流

  4)設定事件時間與水位線

  5)分組、開窗、聚合

  6)寫入ClickHouse

3.2 功能實現

3.2.1 自定義注解

package com.yuange.flinkrealtime.app.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @作者:袁哥
 * @時間:2021/8/6 18:25
 */

@Target(ElementType.FIELD)  //作用在類的成員變量上
@Retention(RetentionPolicy.RUNTIME) //運行時使用
public @interface NoSink {
}

3.2.2 封裝商品統計實體類ProductStats

package com.yuange.flinkrealtime.bean;

import com.yuange.flinkrealtime.app.annotation.NoSink;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;
import java.util.HashSet;
import java.util.Set;

/**
 * @作者:袁哥
 * @時間:2021/8/6 18:24
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ProductStats {

    private String stt;//窗口起始時間
    private String edt;  //窗口結束時間
    private Long sku_id; //sku編號
    private String sku_name;//sku名稱
    private BigDecimal sku_price; //sku單價
    private Long spu_id; //spu編號
    private String spu_name;//spu名稱
    private Long tm_id; //品牌編號
    private String tm_name;//品牌名稱
    private Long category3_id;//品類編號
    private String category3_name;//品類名稱

    private Long click_ct = 0L;  //點擊數
    private Long display_ct = 0L; //曝光數

    private Long favor_ct = 0L; //收藏數

    private Long cart_ct = 0L;  //添加購物車數

    private Long order_sku_num = 0L; //下單商品個數

    //下單商品金額  不是整個訂單的金額
    private BigDecimal order_amount = BigDecimal.ZERO;

    private Long order_ct = 0L; //訂單數

    //支付金額
    private BigDecimal payment_amount = BigDecimal.ZERO;

    private Long paid_order_ct = 0L;  //支付訂單數

    private Long refund_order_ct = 0L; //退款訂單數

    private BigDecimal refund_amount = BigDecimal.ZERO;

    private Long comment_ct = 0L;//評論訂單數

    private Long good_comment_ct = 0L; //好評訂單數

    @NoSink
    private Set<Long> orderIdSet = new HashSet<>();  //用於統計訂單數  存儲下單的訂單id, 通過這個集合的長度來得到訂單數

    @NoSink
    private Set<Long> paidOrderIdSet = new HashSet<>(); //用於統計支付訂單數

    @NoSink
    private Set<Long> refundOrderIdSet = new HashSet<>();//用於退款支付訂單數

    private Long ts; //統計時間戳
}

3.2.4 在Constant中添加常量

public static final String TOPIC_DWD_FAVOR_INFO = "dwd_favor_info";
    public static final String TOPIC_DWD_CART_INFO = "dwd_cart_info";
    public static final String TOPIC_DWD_ORDER_REFUND_INFO = "dwd_order_refund_info";
    public static final String TOPIC_DWD_COMMENT_INFO = "dwd_comment_info";

    public static final Object FOUR_STAR_COMMENT = "1204";
    public static final Object FIVE_STAR_COMMENT = "1205";

3.2.5 消費Kfka數據, 合成一個流

  1)代碼如下,編寫完成后在Idea中啟動它

package com.yuange.flinkrealtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV2;
import com.yuange.flinkrealtime.bean.OrderWide;
import com.yuange.flinkrealtime.bean.PaymentWide;
import com.yuange.flinkrealtime.bean.ProductStats;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.util.YuangeCommonUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Map;

/**
 * @作者:袁哥
 * @時間:2021/8/6 18:19
 */
public class DwsProductStatsApp extends BaseAppV2 {

    public static void main(String[] args) {
        new DwsProductStatsApp().init(
                4002,
                1,
                "DwsProductStatsApp",
                "DwsProductStatsApp",
                Constant.TOPIC_DWD_PAGE, Constant.TOPIC_DWD_DISPLAY,
                Constant.TOPIC_DWD_FAVOR_INFO, Constant.TOPIC_DWD_CART_INFO,
                Constant.TOPIC_DWM_ORDER_WIDE, Constant.TOPIC_DWM_PAYMENT_WIDE,
                Constant.TOPIC_DWD_ORDER_REFUND_INFO, Constant.TOPIC_DWD_COMMENT_INFO
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
        // 1. 把8個流union在一起
        DataStream<ProductStats> productStatsDataStream = unionStreams(streams);
        productStatsDataStream.print();
    }

    private DataStream<ProductStats> unionStreams(Map<String, DataStreamSource<String>> streams) {
        //1. 點擊
        SingleOutputStreamOperator<ProductStats> clickStatsStream = streams.get(Constant.TOPIC_DWD_PAGE)
                .flatMap(new FlatMapFunction<String, ProductStats>() {
                    @Override
                    public void flatMap(String value,
                                        Collector<ProductStats> out) throws Exception {
                        JSONObject obj = JSON.parseObject(value);   //將string類型轉化為JSON類型
                        JSONObject page = obj.getJSONObject("page");    //獲取page
                        String page_id = page.getString("page_id"); //從page中獲取page_id
                        String item_type = page.getString("item_type"); //從page中獲取item_type

                        if ("good_detail".equals(page_id) && page.get("item") != null && "sku_id".equals(item_type)) {    //點擊行為
                            Long item = page.getLong("item");
                            ProductStats productStats = new ProductStats();
                            productStats.setSku_id(item);
                            productStats.setClick_ct(1L);
                            productStats.setTs(obj.getLong("ts"));
                            out.collect(productStats);
                        }
                    }
                });

        //2. 曝光
        SingleOutputStreamOperator<ProductStats> displayStatsStream = streams.get(Constant.TOPIC_DWD_DISPLAY)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);   //將string類型轉化為JSON類型
                    Long skuId = obj.getLong("item");
                    Long ts = obj.getLong("ts");

                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(skuId);
                    productStats.setTs(ts);
                    productStats.setDisplay_ct(1L);
                    return productStats;
                });

        //3.收藏
        SingleOutputStreamOperator<ProductStats> favorStatsStream = streams.get(Constant.TOPIC_DWD_FAVOR_INFO)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(obj.getLong("sku_id"));
                    productStats.setFavor_ct(1L);
                    productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                    return productStats;
                });

        //4.購物車cart
        SingleOutputStreamOperator<ProductStats> cartStatsStream = streams.get(Constant.TOPIC_DWD_CART_INFO)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(obj.getLong("sku_id"));
                    productStats.setCart_ct(1L);
                    productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                    return productStats;
                });

        //5.訂單order
        SingleOutputStreamOperator<ProductStats> orderWideStatsStream = streams.get(Constant.TOPIC_DWM_ORDER_WIDE)
                .map(s -> {
                    OrderWide orderWide = JSON.parseObject(s, OrderWide.class); //將string類型轉為Json類型,然后封裝在OrderWide對象中
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(orderWide.getSku_id());
                    productStats.getOrderIdSet().add(orderWide.getOrder_id());
                    productStats.setTs(YuangeCommonUtil.toTs(orderWide.getCreate_time()));
                    productStats.setOrder_amount(orderWide.getSplit_total_amount());
                    productStats.setOrder_sku_num(orderWide.getSku_num());
                    return productStats;
                });

        //6.支付
        SingleOutputStreamOperator<ProductStats> paymentWideStatsStream = streams.get(Constant.TOPIC_DWM_PAYMENT_WIDE)
                .map(s -> {
                    PaymentWide paymentWide = JSON.parseObject(s, PaymentWide.class);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(paymentWide.getSku_id());
                    productStats.setTs(YuangeCommonUtil.toTs(paymentWide.getPayment_create_time()));
                    productStats.getPaidOrderIdSet().add(paymentWide.getOrder_id());
                    productStats.setPayment_amount(paymentWide.getSplit_total_amount());
                    return productStats;
                });

        //7.退款
        SingleOutputStreamOperator<ProductStats> refundStatsStream = streams.get(Constant.TOPIC_DWD_ORDER_REFUND_INFO)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(obj.getLong("sku_id"));
                    productStats.setRefund_amount(obj.getBigDecimal("refund_amount"));
                    productStats.getRefundOrderIdSet().add(obj.getLongValue("order_id"));
                    productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                    return productStats;
                });

        //8.評價
        SingleOutputStreamOperator<ProductStats> commentStatsStream = streams.get(Constant.TOPIC_DWD_COMMENT_INFO)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(obj.getLong("sku_id"));
                    productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                    productStats.setComment_ct(1L);
                    String appraise = obj.getString("appraise");
                    if (Constant.FOUR_STAR_COMMENT.equals(appraise) || Constant.FIVE_STAR_COMMENT.equals(appraise)) {
                        productStats.setGood_comment_ct(1L);
                    }
                    return productStats;
                });

        return clickStatsStream.union(
                displayStatsStream,
                cartStatsStream,
                favorStatsStream,
                orderWideStatsStream,
                paymentWideStatsStream,
                commentStatsStream,
                refundStatsStream
        );
    }
}

  2)啟動Hadoop

hadoop.sh start

  3)啟動ZK

zk start

  4)啟動Kafka

kafka.sh start

  5)啟動MaxWell

maxwell.sh start

  6)啟動日志服務器

log-lg.sh start

  7)啟動Hbase

start-hbase.sh

  8)啟動redis

redis.sh

  9)啟動Flink的yarn-session

/opt/module/flink-yarn/bin/yarn-session.sh -d

  10)提交如下程序至yarn-session中運行

  11)生產日志數據、業務數據

  12)查看Idea控制台打印的數據

3.2.6 開窗, 聚合

package com.yuange.flinkrealtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV2;
import com.yuange.flinkrealtime.bean.OrderWide;
import com.yuange.flinkrealtime.bean.PaymentWide;
import com.yuange.flinkrealtime.bean.ProductStats;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.util.YuangeCommonUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Map;

/**
 * @作者:袁哥
 * @時間:2021/8/6 18:19
 */
public class DwsProductStatsApp extends BaseAppV2 {

    public static void main(String[] args) {
        new DwsProductStatsApp().init(
                4002,
                1,
                "DwsProductStatsApp",
                "DwsProductStatsApp",
                Constant.TOPIC_DWD_PAGE, Constant.TOPIC_DWD_DISPLAY,
                Constant.TOPIC_DWD_FAVOR_INFO, Constant.TOPIC_DWD_CART_INFO,
                Constant.TOPIC_DWM_ORDER_WIDE, Constant.TOPIC_DWM_PAYMENT_WIDE,
                Constant.TOPIC_DWD_ORDER_REFUND_INFO, Constant.TOPIC_DWD_COMMENT_INFO
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
        //1.把8個流union在一起
        DataStream<ProductStats> productStatsDataStream = unionStreams(streams);
//        productStatsDataStream.print();
        //2.開窗聚合
        SingleOutputStreamOperator<ProductStats> aggregatedStream = agg(productStatsDataStream);
        aggregatedStream.print();
    }

    private SingleOutputStreamOperator<ProductStats> agg(DataStream<ProductStats> productStatsDataStream) {
        return productStatsDataStream
                .assignTimestampsAndWatermarks(
                    WatermarkStrategy
                            .<ProductStats>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                            .withTimestampAssigner((ps,ts) -> ps.getTs())
                )
                .keyBy(ProductStats::getSku_id)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<ProductStats>() {
                            @Override
                            public ProductStats reduce(ProductStats value1, ProductStats value2) throws Exception {

                                value1.setClick_ct(value1.getClick_ct() + value2.getClick_ct());
                                value1.setDisplay_ct(value1.getDisplay_ct() + value2.getDisplay_ct());
                                value1.setFavor_ct(value1.getFavor_ct() + value2.getFavor_ct());
                                value1.setCart_ct(value1.getCart_ct() + value2.getCart_ct());
                                value1.setComment_ct(value1.getComment_ct() + value2.getComment_ct());
                                value1.setGood_comment_ct(value1.getGood_comment_ct() + value2.getGood_comment_ct());

                                value1.setOrder_sku_num(value1.getOrder_sku_num() + value2.getOrder_sku_num());
                                value1.setOrder_amount(value1.getOrder_amount().add(value2.getOrder_amount()));

                                value1.setPayment_amount(value1.getPayment_amount().add(value2.getPayment_amount()));
                                value1.setRefund_amount(value1.getRefund_amount().add(value2.getRefund_amount()));

                                value1.getOrderIdSet().addAll(value2.getOrderIdSet());
                                value1.getPaidOrderIdSet().addAll(value2.getPaidOrderIdSet());
                                value1.getRefundOrderIdSet().addAll(value2.getRefundOrderIdSet());

                                return value1;
                            }
                        },
                        new ProcessWindowFunction<ProductStats, ProductStats, Long, TimeWindow>() {
                            SimpleDateFormat sdf;
                            @Override
                            public void open(Configuration parameters) throws Exception {
                                sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                            }

                            @Override
                            public void process(Long key,
                                                Context context,
                                                Iterable<ProductStats> elements,
                                                Collector<ProductStats> out) throws Exception {
                                TimeWindow window = context.window();
                                ProductStats productStats = elements.iterator().next();
                                productStats.setStt(sdf.format(window.getStart()));
                                productStats.setEdt(sdf.format(window.getEnd()));
                                productStats.setOrder_ct((long)productStats.getOrderIdSet().size());
                                productStats.setPaid_order_ct((long)productStats.getPaidOrderIdSet().size());
                                productStats.setRefund_order_ct((long)productStats.getRefundOrderIdSet().size());
                                out.collect(productStats);
                            }
                        }
                );
    }

    private DataStream<ProductStats> unionStreams(Map<String, DataStreamSource<String>> streams) {
        //1. 點擊
        SingleOutputStreamOperator<ProductStats> clickStatsStream = streams.get(Constant.TOPIC_DWD_PAGE)
                .flatMap(new FlatMapFunction<String, ProductStats>() {
                    @Override
                    public void flatMap(String value,
                                        Collector<ProductStats> out) throws Exception {
                        JSONObject obj = JSON.parseObject(value);   //將string類型轉化為JSON類型
                        JSONObject page = obj.getJSONObject("page");    //獲取page
                        String page_id = page.getString("page_id"); //從page中獲取page_id
                        String item_type = page.getString("item_type"); //從page中獲取item_type

                        if ("good_detail".equals(page_id) && page.get("item") != null && "sku_id".equals(item_type)) {    //點擊行為
                            Long item = page.getLong("item");
                            ProductStats productStats = new ProductStats();
                            productStats.setSku_id(item);
                            productStats.setClick_ct(1L);
                            productStats.setTs(obj.getLong("ts"));
                            out.collect(productStats);
                        }
                    }
                });

        //2. 曝光
        SingleOutputStreamOperator<ProductStats> displayStatsStream = streams.get(Constant.TOPIC_DWD_DISPLAY)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);   //將string類型轉化為JSON類型
                    Long skuId = obj.getLong("item");
                    Long ts = obj.getLong("ts");

                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(skuId);
                    productStats.setTs(ts);
                    productStats.setDisplay_ct(1L);
                    return productStats;
                });

        //3.收藏
        SingleOutputStreamOperator<ProductStats> favorStatsStream = streams.get(Constant.TOPIC_DWD_FAVOR_INFO)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(obj.getLong("sku_id"));
                    productStats.setFavor_ct(1L);
                    productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                    return productStats;
                });

        //4.購物車cart
        SingleOutputStreamOperator<ProductStats> cartStatsStream = streams.get(Constant.TOPIC_DWD_CART_INFO)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(obj.getLong("sku_id"));
                    productStats.setCart_ct(1L);
                    productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                    return productStats;
                });

        //5.訂單order
        SingleOutputStreamOperator<ProductStats> orderWideStatsStream = streams.get(Constant.TOPIC_DWM_ORDER_WIDE)
                .map(s -> {
                    OrderWide orderWide = JSON.parseObject(s, OrderWide.class); //將string類型轉為Json類型,然后封裝在OrderWide對象中
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(orderWide.getSku_id());
                    productStats.getOrderIdSet().add(orderWide.getOrder_id());
                    productStats.setTs(YuangeCommonUtil.toTs(orderWide.getCreate_time()));
                    productStats.setOrder_amount(orderWide.getSplit_total_amount());
                    productStats.setOrder_sku_num(orderWide.getSku_num());
                    return productStats;
                });

        //6.支付
        SingleOutputStreamOperator<ProductStats> paymentWideStatsStream = streams.get(Constant.TOPIC_DWM_PAYMENT_WIDE)
                .map(s -> {
                    PaymentWide paymentWide = JSON.parseObject(s, PaymentWide.class);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(paymentWide.getSku_id());
                    productStats.setTs(YuangeCommonUtil.toTs(paymentWide.getPayment_create_time()));
                    productStats.getPaidOrderIdSet().add(paymentWide.getOrder_id());
                    productStats.setPayment_amount(paymentWide.getSplit_total_amount());
                    return productStats;
                });

        //7.退款
        SingleOutputStreamOperator<ProductStats> refundStatsStream = streams.get(Constant.TOPIC_DWD_ORDER_REFUND_INFO)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(obj.getLong("sku_id"));
                    productStats.setRefund_amount(obj.getBigDecimal("refund_amount"));
                    productStats.getRefundOrderIdSet().add(obj.getLongValue("order_id"));
                    productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                    return productStats;
                });

        //8.評價
        SingleOutputStreamOperator<ProductStats> commentStatsStream = streams.get(Constant.TOPIC_DWD_COMMENT_INFO)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(obj.getLong("sku_id"));
                    productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                    productStats.setComment_ct(1L);
                    String appraise = obj.getString("appraise");
                    if (Constant.FOUR_STAR_COMMENT.equals(appraise) || Constant.FIVE_STAR_COMMENT.equals(appraise)) {
                        productStats.setGood_comment_ct(1L);
                    }
                    return productStats;
                });

        return clickStatsStream.union(
                displayStatsStream,
                cartStatsStream,
                favorStatsStream,
                orderWideStatsStream,
                paymentWideStatsStream,
                commentStatsStream,
                refundStatsStream
        );
    }
}

3.2.7 補充維度信息

  1)代碼如下

package com.yuange.flinkrealtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV2;
import com.yuange.flinkrealtime.bean.OrderWide;
import com.yuange.flinkrealtime.bean.PaymentWide;
import com.yuange.flinkrealtime.bean.ProductStats;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.function.DimAsyncFunction;
import com.yuange.flinkrealtime.util.DimUtil;
import com.yuange.flinkrealtime.util.YuangeCommonUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;

import java.sql.Connection;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * @作者:袁哥
 * @時間:2021/8/6 18:19
 */
public class DwsProductStatsApp extends BaseAppV2 {

    public static void main(String[] args) {
        new DwsProductStatsApp().init(
                4002,
                1,
                "DwsProductStatsApp",
                "DwsProductStatsApp",
                Constant.TOPIC_DWD_PAGE, Constant.TOPIC_DWD_DISPLAY,
                Constant.TOPIC_DWD_FAVOR_INFO, Constant.TOPIC_DWD_CART_INFO,
                Constant.TOPIC_DWM_ORDER_WIDE, Constant.TOPIC_DWM_PAYMENT_WIDE,
                Constant.TOPIC_DWD_ORDER_REFUND_INFO, Constant.TOPIC_DWD_COMMENT_INFO
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
        //1.把8個流union在一起
        DataStream<ProductStats> productStatsDataStream = unionStreams(streams);
//        productStatsDataStream.print();
        //2.開窗聚合
        SingleOutputStreamOperator<ProductStats> aggregatedStream = agg(productStatsDataStream);
//        aggregatedStream.print();
        //3.補齊維度
        SingleOutputStreamOperator<ProductStats> psStreamWithDim = joinDim(aggregatedStream);
        psStreamWithDim.print();
    }

    private SingleOutputStreamOperator<ProductStats> joinDim(SingleOutputStreamOperator<ProductStats> aggregatedStream) {
        return AsyncDataStream.unorderedWait(
                aggregatedStream,
                new DimAsyncFunction<ProductStats>() {
                    @Override
                    public void addDim(Connection phoenixConn,
                                       Jedis client,
                                       ProductStats ps,
                                       ResultFuture<ProductStats> resultFuture) throws Exception {
                        JSONObject skuInfo = DimUtil.readDim(phoenixConn, client, Constant.DIM_SKU_INFO, ps.getSku_id().toString());
                        ps.setSku_name(skuInfo.getString("SKU_NAME"));
                        ps.setSku_price(skuInfo.getBigDecimal("PRICE"));

                        ps.setSpu_id(skuInfo.getLong("SPU_ID"));
                        ps.setTm_id(skuInfo.getLong("TM_ID"));
                        ps.setCategory3_id(skuInfo.getLong("CATEGORY3_ID"));

                        JSONObject spuInfo = DimUtil.readDim(phoenixConn, client, Constant.DIM_SPU_INFO, ps.getSpu_id().toString());
                        ps.setSpu_name(spuInfo.getString("SPU_NAME"));

                        JSONObject tmInfo = DimUtil.readDim(phoenixConn, client, Constant.DIM_BASE_TRADEMARK, ps.getTm_id().toString());
                        ps.setTm_name(tmInfo.getString("TM_NAME"));

                        JSONObject c3Info = DimUtil.readDim(phoenixConn, client, Constant.DIM_BASE_CATEGORY3, ps.getCategory3_id().toString());
                        ps.setCategory3_name(c3Info.getString("NAME"));

                        resultFuture.complete(Collections.singletonList(ps));
                    }
                },
                60,
                TimeUnit.SECONDS
        );
    }

    private SingleOutputStreamOperator<ProductStats> agg(DataStream<ProductStats> productStatsDataStream) {
        return productStatsDataStream
                .assignTimestampsAndWatermarks(
                    WatermarkStrategy
                            .<ProductStats>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                            .withTimestampAssigner((ps,ts) -> ps.getTs())
                )
                .keyBy(ProductStats::getSku_id)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<ProductStats>() {
                            @Override
                            public ProductStats reduce(ProductStats value1, ProductStats value2) throws Exception {

                                value1.setClick_ct(value1.getClick_ct() + value2.getClick_ct());
                                value1.setDisplay_ct(value1.getDisplay_ct() + value2.getDisplay_ct());
                                value1.setFavor_ct(value1.getFavor_ct() + value2.getFavor_ct());
                                value1.setCart_ct(value1.getCart_ct() + value2.getCart_ct());
                                value1.setComment_ct(value1.getComment_ct() + value2.getComment_ct());
                                value1.setGood_comment_ct(value1.getGood_comment_ct() + value2.getGood_comment_ct());

                                value1.setOrder_sku_num(value1.getOrder_sku_num() + value2.getOrder_sku_num());
                                value1.setOrder_amount(value1.getOrder_amount().add(value2.getOrder_amount()));

                                value1.setPayment_amount(value1.getPayment_amount().add(value2.getPayment_amount()));
                                value1.setRefund_amount(value1.getRefund_amount().add(value2.getRefund_amount()));

                                value1.getOrderIdSet().addAll(value2.getOrderIdSet());
                                value1.getPaidOrderIdSet().addAll(value2.getPaidOrderIdSet());
                                value1.getRefundOrderIdSet().addAll(value2.getRefundOrderIdSet());

                                return value1;
                            }
                        },
                        new ProcessWindowFunction<ProductStats, ProductStats, Long, TimeWindow>() {
                            SimpleDateFormat sdf;
                            @Override
                            public void open(Configuration parameters) throws Exception {
                                sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                            }

                            @Override
                            public void process(Long key,
                                                Context context,
                                                Iterable<ProductStats> elements,
                                                Collector<ProductStats> out) throws Exception {
                                TimeWindow window = context.window();
                                ProductStats productStats = elements.iterator().next();
                                productStats.setStt(sdf.format(window.getStart()));
                                productStats.setEdt(sdf.format(window.getEnd()));
                                productStats.setOrder_ct((long)productStats.getOrderIdSet().size());
                                productStats.setPaid_order_ct((long)productStats.getPaidOrderIdSet().size());
                                productStats.setRefund_order_ct((long)productStats.getRefundOrderIdSet().size());
                                out.collect(productStats);
                            }
                        }
                );
    }

    private DataStream<ProductStats> unionStreams(Map<String, DataStreamSource<String>> streams) {
        //1. 點擊
        SingleOutputStreamOperator<ProductStats> clickStatsStream = streams.get(Constant.TOPIC_DWD_PAGE)
                .flatMap(new FlatMapFunction<String, ProductStats>() {
                    @Override
                    public void flatMap(String value,
                                        Collector<ProductStats> out) throws Exception {
                        JSONObject obj = JSON.parseObject(value);   //將string類型轉化為JSON類型
                        JSONObject page = obj.getJSONObject("page");    //獲取page
                        String page_id = page.getString("page_id"); //從page中獲取page_id
                        String item_type = page.getString("item_type"); //從page中獲取item_type

                        if ("good_detail".equals(page_id) && page.get("item") != null && "sku_id".equals(item_type)) {    //點擊行為
                            Long item = page.getLong("item");
                            ProductStats productStats = new ProductStats();
                            productStats.setSku_id(item);
                            productStats.setClick_ct(1L);
                            productStats.setTs(obj.getLong("ts"));
                            out.collect(productStats);
                        }
                    }
                });

        //2. 曝光
        SingleOutputStreamOperator<ProductStats> displayStatsStream = streams.get(Constant.TOPIC_DWD_DISPLAY)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);   //將string類型轉化為JSON類型
                    Long skuId = obj.getLong("item");
                    Long ts = obj.getLong("ts");

                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(skuId);
                    productStats.setTs(ts);
                    productStats.setDisplay_ct(1L);
                    return productStats;
                });

        //3.收藏
        SingleOutputStreamOperator<ProductStats> favorStatsStream = streams.get(Constant.TOPIC_DWD_FAVOR_INFO)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(obj.getLong("sku_id"));
                    productStats.setFavor_ct(1L);
                    productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                    return productStats;
                });

        //4.購物車cart
        SingleOutputStreamOperator<ProductStats> cartStatsStream = streams.get(Constant.TOPIC_DWD_CART_INFO)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(obj.getLong("sku_id"));
                    productStats.setCart_ct(1L);
                    productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                    return productStats;
                });

        //5.訂單order
        SingleOutputStreamOperator<ProductStats> orderWideStatsStream = streams.get(Constant.TOPIC_DWM_ORDER_WIDE)
                .map(s -> {
                    OrderWide orderWide = JSON.parseObject(s, OrderWide.class); //將string類型轉為Json類型,然后封裝在OrderWide對象中
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(orderWide.getSku_id());
                    productStats.getOrderIdSet().add(orderWide.getOrder_id());
                    productStats.setTs(YuangeCommonUtil.toTs(orderWide.getCreate_time()));
                    productStats.setOrder_amount(orderWide.getSplit_total_amount());
                    productStats.setOrder_sku_num(orderWide.getSku_num());
                    return productStats;
                });

        //6.支付
        SingleOutputStreamOperator<ProductStats> paymentWideStatsStream = streams.get(Constant.TOPIC_DWM_PAYMENT_WIDE)
                .map(s -> {
                    PaymentWide paymentWide = JSON.parseObject(s, PaymentWide.class);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(paymentWide.getSku_id());
                    productStats.setTs(YuangeCommonUtil.toTs(paymentWide.getPayment_create_time()));
                    productStats.getPaidOrderIdSet().add(paymentWide.getOrder_id());
                    productStats.setPayment_amount(paymentWide.getSplit_total_amount());
                    return productStats;
                });

        //7.退款
        SingleOutputStreamOperator<ProductStats> refundStatsStream = streams.get(Constant.TOPIC_DWD_ORDER_REFUND_INFO)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(obj.getLong("sku_id"));
                    productStats.setRefund_amount(obj.getBigDecimal("refund_amount"));
                    productStats.getRefundOrderIdSet().add(obj.getLongValue("order_id"));
                    productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                    return productStats;
                });

        //8.評價
        SingleOutputStreamOperator<ProductStats> commentStatsStream = streams.get(Constant.TOPIC_DWD_COMMENT_INFO)
                .map(s -> {
                    JSONObject obj = JSON.parseObject(s);
                    ProductStats productStats = new ProductStats();
                    productStats.setSku_id(obj.getLong("sku_id"));
                    productStats.setTs(YuangeCommonUtil.toTs(obj.getString("create_time")));
                    productStats.setComment_ct(1L);
                    String appraise = obj.getString("appraise");
                    if (Constant.FOUR_STAR_COMMENT.equals(appraise) || Constant.FIVE_STAR_COMMENT.equals(appraise)) {
                        productStats.setGood_comment_ct(1L);
                    }
                    return productStats;
                });

        return clickStatsStream.union(
                displayStatsStream,
                cartStatsStream,
                favorStatsStream,
                orderWideStatsStream,
                paymentWideStatsStream,
                commentStatsStream,
                refundStatsStream
        );
    }
}

  2)Idea中啟動該程序

  3)生產日志數據和業務數據

cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar
cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

  4)查看Idea控制台

3.2.5 寫入到ClickHouse

  1)在ClickHouse中創建主題寬表

use flinkdb;
create table product_stats_2021 (
   stt DateTime,
   edt DateTime,
   sku_id  UInt64,
   sku_name String,
   sku_price Decimal64(2),
   spu_id UInt64,
   spu_name String ,
   tm_id UInt64,
   tm_name String,
   category3_id UInt64,
   category3_name String ,
   display_ct UInt64,
   click_ct UInt64,
   favor_ct UInt64,
   cart_ct UInt64,
   order_sku_num UInt64,
   order_amount Decimal64(2),
   order_ct UInt64 ,
   payment_amount Decimal64(2),
   paid_order_ct UInt64,
   refund_order_ct UInt64,
   refund_amount Decimal64(2),
   comment_ct UInt64,
   good_comment_ct UInt64 ,
   ts UInt64
)engine =ReplacingMergeTree( ts)
        partition by  toYYYYMMDD(stt)
        order by   (stt,edt,sku_id );

  2)寫數據到ClickHouse

    (1)在Constant中添加常量

public static final String TABLE_PRODUCT_STATS = "product_stats_2021";

    (2)承接上面代碼,將其補充完整

@Override
    protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
        //1.把8個流union在一起
        DataStream<ProductStats> productStatsDataStream = unionStreams(streams);
//        productStatsDataStream.print();
        //2.開窗聚合
        SingleOutputStreamOperator<ProductStats> aggregatedStream = agg(productStatsDataStream);
//        aggregatedStream.print();
        //3.補齊維度
        SingleOutputStreamOperator<ProductStats> psStreamWithDim = joinDim(aggregatedStream);
//        psStreamWithDim.print();
        //4.寫入到ClickHouse中
        writeToClickHouse(psStreamWithDim);
    }

    private void writeToClickHouse(SingleOutputStreamOperator<ProductStats> psStreamWithDim) {
        psStreamWithDim.addSink(
                FlinkSinkUtil.getClickHouseSink(
                        Constant.CLICKHOUSE_DB,
                        Constant.TABLE_PRODUCT_STATS,
                        ProductStats.class
                )
        );
    }

    (3)在Idea中啟動程序

    (4)生產日志數據和業務數據

cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar
cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

    (5)查看clickhouse中是否有數據

select * from product_stats_2021;

  3)使用Maven將程序打包上傳至Linux

  4)修改realtime.sh腳本,將DwsProductStatsApp 提交至yarn-session上運行

#!/bin/bash
flink=/opt/module/flink-yarn/bin/flink
jar=/opt/module/applog/flink-realtime-1.0-SNAPSHOT.jar

apps=(
        com.yuange.flinkrealtime.app.dwd.DwdLogApp
        com.yuange.flinkrealtime.app.dwd.DwdDbApp
        #com.yuange.flinkrealtime.app.dwm.DwmUvApp
        #com.yuange.flinkrealtime.app.dwm.DwmJumpDetailApp_Two
        com.yuange.flinkrealtime.app.dwm.DwmOrderWideApp_Cache_Async
        com.yuange.flinkrealtime.app.dwm.DwmPaymentWideApp
        #com.yuange.flinkrealtime.app.dws.DwsVisitorStatsApp
        com.yuange.flinkrealtime.app.dws.DwsProductStatsApp
        #com.yuange.flinkrealtime.app.dws.DwsProvinceStatsApp
)

for app in ${apps[*]} ; do
        $flink run -d -c $app $jar
done

  5)提交程序,運行realtime.sh腳本

  6)生產日志數據和業務數據

cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar
cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

  7)查看clickhouse中是否有數據增加

第4章 DWS層:地區主題寬表

  前面的所有操作都是使用的StreamApi來完成的, 這次我們采用FlinkSql來完成.

4.1 需求分析與思路

  1)定義Table流環境

  2)把數據源定義為動態表

  3)通過SQL查詢出結果表

  4)把結果表轉換為數據流

  5)把數據流寫入目標數據庫

  6)直接把結果表的數據寫入到ClickHouse中

  如果是Flink官方支持的數據庫,也可以直接把目標數據表定義為動態表,用insert into 寫入。由於ClickHouse目前官方沒有支持的jdbc連接器(目前支持Mysql、 PostgreSQL、Derby)。阿里雲有實現好的connector, 我們使用這個connector.參考地址: https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.574.d9c541ea3J78mc

4.2 功能實現

  數據來源於topic: dwm_order_wide

4.2.1 導入ClickHouse連接器(此步驟直接跳過即可,因為我們下面自定義連接器)

<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>flink-connector-clickhouse</artifactId>
    <version>1.11.0</version>
</dependency>

  注意: 

    1)由於該連接器目前在遠程的maven倉庫中找不到, 我們需要下載該連接器, 然后安裝到本地倉庫使用

    2)下載地址: https://clickhouse-release-open-access.oss-cn-shanghai.aliyuncs.com/doc-data/flink-connector-clickhouse-1.11.0.jar?spm=a2c4g.11186623.2.6.d9c541ea3J78mc&file=flink-connector-clickhouse-1.11.0.jar 

    3)假設jar包被下載在如下目錄: C:\Users\lzc\Desktop\connector

    4)安裝jar到本地倉庫(需要保證mvn命令已經配置到了path中):

mvn install:install-file -Dfile=C:\Users\lzc\Desktop\connector\flink-connector-clickhouse-1.11.0.jar -DgroupId=com.aliyun -DartifactId=flink-connector-clickhouse -Dversion=1.11.0 -Dpackaging=jar

4.2.2 ClickHouse中創建表

use flinkdb;
create table province_stats_2021 (
   stt DateTime,
   edt DateTime,
   province_id  UInt64,
   province_name String,
   area_code String ,
   iso_code String,
   iso_3166_2 String , 
   order_amount Decimal64(2),
   order_count UInt64, 
   ts UInt64
)engine =ReplacingMergeTree( ts)
        partition by  toYYYYMMDD(stt)
        order by   (stt,edt,province_id );

4.2.3 編寫抽象類

package com.yuange.flinkrealtime.app;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @作者:袁哥
 * @時間:2021/8/6 21:01
 */
public abstract class BaseSqlApp {

    public void init(int port, int p, String ck){
        System.setProperty("HADOOP_USER_NAME","atguigu");
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port",port);
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration).setParallelism(p);

        environment.enableCheckpointing(5000);  //檢查點之間的時間間隔,單位是毫秒
        environment.setStateBackend(new HashMapStateBackend()); //定義狀態后端,以保證將檢查點狀態寫入遠程(HDFS)
        environment.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop162:8020/flinkparent/ck/" + ck);   //配置檢查點存放地址

        environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //設置檢查點模式:精准一次
        environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);   //設置檢查點失敗時重試次數
        environment.getCheckpointConfig()
                .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  //設置檢查點持久化:取消作業時保留外部化檢查點

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        run(tableEnvironment);

        try {
            environment.execute(ck);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    protected abstract void run(StreamTableEnvironment tableEnvironment);
}

4.2.4 封裝數據的對象

package com.yuange.flinkrealtime.bean;

import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;

/**
 * @作者:袁哥
 * @時間:2021/8/6 21:23
 */
@Data
@NoArgsConstructor
public class ProvinceStats {
    private String stt;
    private String edt;
    private Long province_id;
    private String province_name;
    private String area_code;
    private String iso_code;
    private String iso_3166_2;
    private BigDecimal order_amount;
    private Long order_count;
    private Long ts;
}

4.2.5 Constant中添加常量

public static final String TABLE_PROVINCE_STATS = "province_stats_2021";

4.2.6 具體實現代碼

package com.yuange.flinkrealtime.app.dws;

import com.yuange.flinkrealtime.app.BaseSqlApp;
import com.yuange.flinkrealtime.bean.ProvinceStats;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.util.FlinkSinkUtil;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @作者:袁哥
 * @時間:2021/8/6 21:01
 */
public class DwsProvinceStatsApp extends BaseSqlApp {

    public static void main(String[] args) {
        new DwsProvinceStatsApp().init(
                4003,
                1,
                "DwsProvinceStatsApp"
        );
    }

    @Override
    protected void run(StreamTableEnvironment tableEnvironment) {
        // 1. 使用ddl建動態表, 與kafka的topic進行關聯  dwm_order_wide
        tableEnvironment.executeSql("create table order_wide(" +
                "   province_id bigint, " +
                "   province_name string, " +
                "   province_area_code string, " +
                "   province_iso_code string, " +
                "   province_3166_2_code string, " +
                "   order_id bigint, " +
                "   split_total_amount decimal(20,2), " +
                "   create_time string, " +
                "   et as to_timestamp(create_time), " +
                "   watermark for et as et - interval '20' second " +
                ")with(" +
                "   'connector' = 'kafka', " +
                "   'properties.bootstrap.servers' = 'hadoop162:9092,hadoop163:9092, hadoop164:9092', " +
                "   'properties.group.id' = 'DwsProvinceStatsApp', " +
                "   'topic' = '" + Constant.TOPIC_DWM_ORDER_WIDE + "', " +
                "   'scan.startup.mode' = 'earliest-offset', " +
                "   'format' = 'json' " +
                ")"
        );

        //2.在order_wide這張表執行連續查詢
        Table table = tableEnvironment.sqlQuery("select" +
                "   date_format(tumble_start(et,interval '5' second), 'yyyy-MM-dd HH:mm:ss') stt, " +
                "   date_format(tumble_end(et,interval '5' second), 'yyyy-MM-dd HH:mm:ss') edt, " +
                "   province_id, " +
                "   province_name, " +
                "   province_area_code area_code, " +
                "   province_iso_code iso_code, " +
                "   province_3166_2_code iso_3166_2, " +
                "   sum(split_total_amount) order_amount, " +
                "   count(distinct(order_id)) order_count, " +
                "   unix_timestamp() * 1000 ts " +
                " from order_wide " +
                " group by " +
                "   province_id, " +
                "   province_name, " +
                "   province_area_code, " +
                "   province_iso_code, " +
                "   province_3166_2_code, " +
                "   tumble(et,interval '5' second)" +
                "");
        //3.寫入到ClickHouse中
        tableEnvironment.toRetractStream(table, ProvinceStats.class)
                .filter(t -> t.f0)  //過濾掉flase開頭的數據
                .map(t -> t.f1) //返回我們想要的數據
                .addSink(
                        FlinkSinkUtil.getClickHouseSink(
                                    Constant.CLICKHOUSE_DB,
                                    Constant.TABLE_PROVINCE_STATS,
                                    ProvinceStats.class
                                )
                );
    }
}

4.2.7 修改FlinkSinkUtil中代碼,將getClickHouseSink和getJdbcSink修改一下

public static <T> SinkFunction<T> getClickHouseSink(String db,
                                                        String table,
                                                        Class<T> tClass) {
        Field[] fields = tClass.getDeclaredFields();

        String clickhouseDriver = Constant.CLICKHOUSE_DRIVER;
        String url = Constant.CLICKHOUSE_URL_PRE + "/" + db;

        StringBuilder sql = new StringBuilder();
        sql.append("insert into ")
                .append(table)
                .append("(");
        //找到字段名,如果這個字段有NoSink這個注解,則不要拼接
        for (Field field : fields) {
            NoSink noSink = field.getAnnotation(NoSink.class);
            if (noSink == null){
                sql.append(field.getName())
                        .append(",");
            }
        }
        sql.deleteCharAt(sql.length() - 1); //把最后一個逗號刪除
        sql.append(") values(");
        for (Field field : fields) {
            NoSink noSink = field.getAnnotation(NoSink.class);
            if (noSink == null) {
                sql.append("?,");
            }
        }
        sql.deleteCharAt(sql.length() - 1);
        sql.append(")");

//        System.out.println(sql.toString());
        //借助jdbc sink封裝一個ClickHouse sink
        return getJdbcSink(
                url,
                clickhouseDriver,
                sql.toString()
        );
    }

    private static <T> SinkFunction<T> getJdbcSink(String url, String clickhouseDriver, String sql) {
        return JdbcSink.sink(
                sql,
                new JdbcStatementBuilder<T>() {
                    @Override
                    public void accept(PreparedStatement ps, T t) throws SQLException {
                        // 類名.class   對象.getClass()  Class.forName("...")
                        Class<?> tClass = t.getClass();
                        try {
                            Field[] fields = tClass.getDeclaredFields();
                            for (int i = 0, position = 1; i < fields.length; i++) {

                                Field field = fields[i];
                                NoSink noSink = field.getAnnotation(NoSink.class);
                                if (noSink == null) {
                                    field.setAccessible(true);
                                    Object v = field.get(t);
                                    ps.setObject(position++, v);
                                }
                            }
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        }
                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchIntervalMs(100)   //100毫秒處理一次數據
                        .withMaxRetries(3)  //失敗后最大重試次數
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(url)
                        .withDriverName(clickhouseDriver)
                        .build()
        );
    }

4.2.8 測試

  1)將程序打包上傳至Linux

  2)啟動hadoop

hadoop.sh start

  3)啟動ZK

zk start

  4)啟動Kafka

kafka.sh start

  5)啟動maxwell

maxwell.sh start

  6)啟動redis

redis.sh 

  7)啟動HBase

start-hbase.sh

  8)修改realtime.sh腳本

#!/bin/bash
flink=/opt/module/flink-yarn/bin/flink
jar=/opt/module/applog/flink-realtime-1.0-SNAPSHOT.jar

apps=(
        #com.yuange.flinkrealtime.app.dwd.DwdLogApp
        com.yuange.flinkrealtime.app.dwd.DwdDbApp
        #com.yuange.flinkrealtime.app.dwm.DwmUvApp
        #com.yuange.flinkrealtime.app.dwm.DwmJumpDetailApp_Two
        com.yuange.flinkrealtime.app.dwm.DwmOrderWideApp_Cache_Async
        #com.yuange.flinkrealtime.app.dwm.DwmPaymentWideApp
        #com.yuange.flinkrealtime.app.dws.DwsVisitorStatsApp
     #com.yuange.flinkrealtime.app.dws.DwsProductStatsApp com.yuange.flinkrealtime.app.dws.DwsProvinceStatsApp )
for app in ${apps[*]} ; do $flink run -d -c $app $jar done

  9)啟動yarn-session

/opt/module/flink-yarn/bin/yarn-session.sh -d

  10)提交realtime.sh腳本,將程序提交至yarn-session上運行

realtime.sh

  11)啟動Clickchouse

sudo systemctl start clickhouse-server

  12)生產業務數據

cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar

  13)查看Clickhouse是否生成數據

select count(*) from province_stats_2021;

第5章 DWS層:搜索關鍵詞主題寬表

5.1 需求分析與思路

  關鍵詞主題這個主要是為了大屏展示中的字符雲的展示效果,用於感性的讓大屏觀看者感知目前的用戶都更關心的那些商品和關鍵詞。關鍵詞的展示也是一種維度聚合的結果,根據聚合的大小來決定關鍵詞的大小。關鍵詞的第一重要來源的就是用戶在搜索欄的搜索,另外就是從以商品為主題的統計中獲取關鍵詞

5.2 關於分詞

  因為無論是從用戶的搜索欄中,還是從商品名稱中文字都是可能是比較長的,且由多個關鍵詞組成,如下圖:

  所以我們需要根據把長文本分割成一個一個的詞,這種分詞技術,在搜索引擎中可能會用到。對於中文分詞,現在的搜索引擎基本上都是使用的第三方分詞器,咱們在計算數據中也可以,使用和搜索引擎中一致的分詞器,IK

5.3 搜索關鍵詞功能實現

5.3.1 導入IK分詞器依賴

<dependency>
    <groupId>com.janeluo</groupId>
    <artifactId>ikanalyzer</artifactId>
    <version>2012_u6</version>
</dependency>

5.3.2 封裝分詞工具類

package com.yuange.flinkrealtime.util;

import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

import java.io.IOException;
import java.io.StringReader;
import java.util.HashSet;
import java.util.Set;

/**
 * @作者:袁哥
 * @時間:2021/8/7 11:42
 */
public class IkUtil {
    public static void main(String[] args) {
        System.out.println(analyzer("我是中國人"));
    }

    public static Set<String> analyzer(String text) {
        //把字符串變成,內存流
        StringReader reader = new StringReader(text);
        IKSegmenter ikSegmenter = new IKSegmenter(reader, true);
        HashSet<String> hashSet = new HashSet<>();

        try {
            Lexeme next = ikSegmenter.next();
            while (next != null) {
                String lexemeText = next.getLexemeText();
                hashSet.add(lexemeText);
                next = ikSegmenter.next();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return hashSet;
    }
}

5.4 自定義函數

  有了分詞器,那么另外一個要考慮的問題就是如何把分詞器的使用揉進FlinkSQL中。因為SQL的語法和相關的函數都是Flink內定的,想要使用外部工具,就必須結合自定義函數

5.4.1 Flink自定義函數概述

  自定義函數(UDF)是一種擴展開發機制,可以用來在查詢語句里調用難以用其他方式表達的頻繁使用或自定義的邏輯。

  函數分類:

    1)Scalar functions:類似於spark的udf

    2)Table functions:類似於 sparkudtf

    3)Aggregate functions:類似於spark的udaf

    4)Table aggregate functions

    5)Async table functins

  考慮到一個詞條包括多個詞語所以分詞是指上是一種一對多的拆分,一拆多的情況,我們應該選擇Table Function

5.4.2 實現自定義的Table Function

  1)參考: https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/functions/udfs/

  2)代碼

package com.yuange.flinkrealtime.function;

import com.yuange.flinkrealtime.util.IkUtil;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import java.util.Set;

/**
 * @作者:袁哥
 * @時間:2021/8/7 11:32
 */
@FunctionHint(output = @DataTypeHint("row<word string>"))   //輸出數據的類型
public class KWUdtf extends TableFunction<Row> {
    public void eval(String kw){
        Set<String> words = IkUtil.analyzer(kw);
        for (String word : words) {
            collect(Row.of(word));
        }
    }
}

5.5 主題寬表具體實現代碼

5.5.1 ClickHouse中建表

use flinkdb;
create table keyword_stats_2021 (
    stt DateTime,
    edt DateTime,
    keyword String ,
    source String ,
    ct UInt64 ,
    ts UInt64
)engine =ReplacingMergeTree( ts)
partition by toYYYYMMDD(stt)
order by (stt,edt,keyword,source );

5.5.2 代碼清單

  1)封裝數據的POJO

package com.yuange.flinkrealtime.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @作者:袁哥
 * @時間:2021/8/7 15:40
 */
@AllArgsConstructor
@Data
@NoArgsConstructor
public class KeywordStats {
    private String stt;
    private String edt;
    private String keyword;
    private String source;
    private Long ct;
    private Long ts;
}

  2)在Constant中新建常量

public static final String TABLE_KEYWORD_STATS = "keyword_stats_2021";

  3)DwsKeyWordSearchStatsApp代碼

package com.yuange.flinkrealtime.app.dws;

import com.yuange.flinkrealtime.app.BaseSqlApp;
import com.yuange.flinkrealtime.bean.KeywordStats;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.function.KWUdtf;
import com.yuange.flinkrealtime.util.FlinkSinkUtil;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @作者:袁哥
 * @時間:2021/8/7 11:19
 */
public class DwsKeyWordSearchStatsApp extends BaseSqlApp {

    public static void main(String[] args) {
        new DwsKeyWordSearchStatsApp().init(
                4004,
                1,
                "DwsKeyWordSearchStatsApp"
        );
    }

    @Override
    protected void run(StreamTableEnvironment tableEnvironment) {
        tableEnvironment.executeSql("create table page_log(" +
                "   common map<string,string>, " +
                "   page map<string,string>, " +
                "   ts bigint, " +
                "   et as to_timestamp(from_unixtime(ts/1000)), " +
                "   watermark for et as et - interval '10' second " +
                ")with(" +
                "   'connector' = 'kafka', " +
                "   'properties.bootstrap.servers' = 'hadoop162:9092,hadoop163:9092, hadoop164:9092', " +
                "   'properties.group.id' = 'DwsKeyWordSearchStatsApp', " +
                "   'topic' = '" + Constant.TOPIC_DWD_PAGE + "', " +
                "   'scan.startup.mode' = 'earliest-offset', " +
                "   'format' = 'json' " +
                ")");

        //1. 過濾出搜索記錄
        Table table = tableEnvironment.sqlQuery("select" +
                " page['item'] keyword, " +
                " et " +
                "from page_log " +
                "where page['page_id'] = 'good_list' and page['item'] is not null");
        tableEnvironment.createTemporaryView("t1",table);

        //2.注冊自定義函數
        tableEnvironment.createTemporaryFunction("ik_analyzer", KWUdtf.class);

        Table table2 = tableEnvironment.sqlQuery("select" +
                " word, " +
                " et " +
                "from t1 " +
                "join lateral table(ik_analyzer(keyword)) on true");
        tableEnvironment.createTemporaryView("t2",table2);

        //3.統計每個窗口,每個關鍵詞搜索的次數
        Table table3 = tableEnvironment.sqlQuery("select" +
                " date_format(tumble_start(et,interval '5' second),'yyyy-MM-dd HH:mm:ss') stt, " +
                " date_format(tumble_end(et,interval '5' second),'yyyy-MM-dd HH:mm:ss') edt, " +
                " word keyword, " +
                " 'search' source, " +
                " count(*) ct, " +
                " unix_timestamp() * 1000 ts " +
                " from t2 " +
                " group by word, tumble(et,interval '5' second)");

        //4.寫入到ClickHouse中
        tableEnvironment.toRetractStream(table3, KeywordStats.class)
                .filter(t -> t.f0)
                .map(t -> t.f1)
                .addSink(FlinkSinkUtil.getClickHouseSink(Constant.CLICKHOUSE_DB,Constant.TABLE_KEYWORD_STATS,KeywordStats.class));
    }
}

5.5.3 測試

  1)將程序打包上傳至Linux

  2)啟動hadoop

hadoop.sh start

  3)啟動ZK

zk start

  4)啟動Kafka

kafka.sh start

  5)啟動日志服務器

log-lg.sh start

  6)修改realtime.sh腳本

#!/bin/bash
flink=/opt/module/flink-yarn/bin/flink
jar=/opt/module/applog/flink-realtime-1.0-SNAPSHOT.jar

apps=(
        com.yuange.flinkrealtime.app.dwd.DwdLogApp
        #com.yuange.flinkrealtime.app.dwd.DwdDbApp
        #com.yuange.flinkrealtime.app.dwm.DwmUvApp
        #com.yuange.flinkrealtime.app.dwm.DwmJumpDetailApp_Two
        #com.yuange.flinkrealtime.app.dwm.DwmOrderWideApp_Cache_Async
        #com.yuange.flinkrealtime.app.dwm.DwmPaymentWideApp
        #com.yuange.flinkrealtime.app.dws.DwsVisitorStatsApp
        #com.yuange.flinkrealtime.app.dws.DwsProductStatsApp
        #com.yuange.flinkrealtime.app.dws.DwsProvinceStatsApp
        com.yuange.flinkrealtime.app.dws.DwsKeyWordSearchStatsApp
)

for app in ${apps[*]} ; do
        $flink run -d -c $app $jar
done

  7)啟動yarn-session

/opt/module/flink-yarn/bin/yarn-session.sh -d

  8)運行腳本,提交程序至yarn-session上運行

realtime.sh

  9)生產日志數據

/opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

  10)查看Clickhouse中是否有數據

select count(*) from keyword_stats_2021;

第6章 DWS層:商品行為關鍵詞主題寬表

  從商品主題獲得,商品關鍵詞與點擊次數、訂單次數、添加購物次數的統計表。

6.1 重構DWSProductStatsApp

  重構DWSProductStatsApp, 增加最終的數據Kafka的代碼

@Override
    protected void run(StreamExecutionEnvironment environment, Map<String, DataStreamSource<String>> streams) {
        //1.把8個流union在一起
        DataStream<ProductStats> productStatsDataStream = unionStreams(streams);
//        productStatsDataStream.print();
        //2.開窗聚合
        SingleOutputStreamOperator<ProductStats> aggregatedStream = agg(productStatsDataStream);
//        aggregatedStream.print();
        //3.補齊維度
        SingleOutputStreamOperator<ProductStats> psStreamWithDim = joinDim(aggregatedStream);
//        psStreamWithDim.print();
        //4.寫入到ClickHouse中
        writeToClickHouse(psStreamWithDim);
        // 5. 寫入到Kafka中 為了給產品關鍵詞准備數據
        writeToKafka(psStreamWithDim);
    }

    private void writeToKafka(SingleOutputStreamOperator<ProductStats> stream) {
        stream
                .map(JSON::toJSONString)
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWS_PRODUCT_STATS));
    }

6.2 自定義UDTF函數

  實現點擊次數、訂單次數、添加購物次數的統計

package com.yuange.flinkrealtime.function;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

/**
 * @作者:袁哥
 * @時間:2021/8/7 16:29
 */
@FunctionHint(output = @DataTypeHint("row<source string, ct bigint>"))
public class KWProductUdtf extends TableFunction<Row> {
    public void eval(Long click_ct, Long cart_ct, Long order_ct) {
        if (click_ct > 0) {
            collect(Row.of("click", click_ct));
        }

        if (cart_ct > 0) {
            collect(Row.of("cart", cart_ct));
        }

        if (order_ct > 0) {
            collect(Row.of("order", order_ct));
        }
    }
}

6.3 具體實現代碼

  1)在Constant中添加常量

public static final String TOPIC_DWS_PRODUCT_STATS = "dws_product_stats";

  2)數據仍然寫入到 關鍵詞主題表: keyword_stats_2021

package com.yuange.flinkrealtime.app.dws;

import com.yuange.flinkrealtime.app.BaseSqlApp;
import com.yuange.flinkrealtime.bean.KeywordStats;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.function.KWProductUdtf;
import com.yuange.flinkrealtime.function.KWUdtf;
import com.yuange.flinkrealtime.util.FlinkSinkUtil;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @作者:袁哥
 * @時間:2021/8/7 16:20
 */
public class DwsKeyWordProductStatsApp extends BaseSqlApp {

    public static void main(String[] args) {
        new DwsKeyWordProductStatsApp().init(
                4005,
                1,
                "DwsKeyWordProductStatsApp"
        );
    }

    @Override
    protected void run(StreamTableEnvironment tableEnvironment) {
        tableEnvironment.executeSql("create table product_stats(" +
                "   stt string, " +
                "   edt string, " +
                "   sku_name string, " +
                "   click_ct bigint, " +
                "   cart_ct bigint, " +
                "   order_ct bigint " +
                ")with(" +
                "   'connector' = 'kafka', " +
                "   'properties.bootstrap.servers' = 'hadoop162:9092,hadoop163:9092, hadoop164:9092', " +
                "   'properties.group.id' = 'DwsKeyWordProductStatsApp', " +
                "   'topic' = '" + Constant.TOPIC_DWS_PRODUCT_STATS + "', " +
                "   'scan.startup.mode' = 'latest-offset', " +
                "   'format' = 'json' " +
                ")");

        // 1. 過濾出來有效數據
        Table table = tableEnvironment.sqlQuery("select * " +
                "from product_stats " +
                "where click_ct > 0 " +
                "or cart_ct > 0 " +
                "or order_ct > 0");
        tableEnvironment.createTemporaryView("t1",table);

        tableEnvironment.createTemporaryFunction("ik_analyzer", KWUdtf.class);

        // 2. 對關鍵詞進行分詞
        Table table2 = tableEnvironment.sqlQuery("select " +
                " stt,edt,word, " +
                " sum(click_ct) click_ct, " +
                " sum(cart_ct) cart_ct, " +
                " sum(order_ct) order_ct " +
                " from (select " +
                " stt, " +
                " edt, " +
                " word, " +
                " click_ct, " +
                " cart_ct, " +
                " order_ct " +
                "from t1, " +
                " lateral table(ik_analyzer(sku_name))) t " +
                " group by stt,edt,word");
        tableEnvironment.createTemporaryView("t2",table2);

        // 3. 把3個指標分別定位到3行
        tableEnvironment.createTemporaryFunction("kw_product", KWProductUdtf.class);

        Table table3 = tableEnvironment.sqlQuery("select " +
                "   stt, " +
                "   edt, " +
                "   word keyword, " +
                "   source, " +
                "   ct, " +
                "   unix_timestamp() * 1000 ts " +
                "from t2 " +
                "join lateral table(kw_product(click_ct, cart_ct, order_ct)) on true");

        tableEnvironment.toRetractStream(table3, KeywordStats.class)
                .filter(t -> t.f0)
                .map(t -> t.f1)
                .addSink(
                        FlinkSinkUtil.getClickHouseSink(
                                Constant.CLICKHOUSE_DB,
                                Constant.TABLE_KEYWORD_STATS,
                                KeywordStats.class
                        )
                );
    }
}

6.4 測試

  1)將程序打包上傳至Linux

  2)啟動hadoop

hadoop.sh start

  3)啟動ZK

zk start

  4)啟動Kafka

kafka.sh start

  5)啟動日志服務器

log-lg.sh start

  6)啟動Redis

redis.sh 

  7)啟動Maxwell

maxwell.sh start

  8)啟動HBase

start-hbase.sh

  9)啟動ClickHouse

sudo systemctl start clickhouse-server

  10)啟動yarn-session

/opt/module/flink-yarn/bin/yarn-session.sh -d

  11)修改realtime.sh啟動腳本

#!/bin/bash
flink=/opt/module/flink-yarn/bin/flink
jar=/opt/module/applog/flink-realtime-1.0-SNAPSHOT.jar

apps=(
        com.yuange.flinkrealtime.app.dwd.DwdLogApp
        com.yuange.flinkrealtime.app.dwd.DwdDbApp
        #com.yuange.flinkrealtime.app.dwm.DwmUvApp
        #com.yuange.flinkrealtime.app.dwm.DwmJumpDetailApp_Two
        com.yuange.flinkrealtime.app.dwm.DwmOrderWideApp_Cache_Async
        com.yuange.flinkrealtime.app.dwm.DwmPaymentWideApp
        #com.yuange.flinkrealtime.app.dws.DwsVisitorStatsApp
        com.yuange.flinkrealtime.app.dws.DwsProductStatsApp
        #com.yuange.flinkrealtime.app.dws.DwsProvinceStatsApp
        #com.yuange.flinkrealtime.app.dws.DwsKeyWordSearchStatsApp
        com.yuange.flinkrealtime.app.dws.DwsKeyWordProductStatsApp
)

for app in ${apps[*]} ; do
        $flink run -d -c $app $jar
done

  12)運行腳本,將程序提交至yarn-session上運行

realtime.sh

  13)生產日志數據和業務數據

cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar
cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

  14)查看ClickHouse中是否有新增數據

第7章 DWS層:總結

  1)DWS層主要是基於DWD和DWM層的數據進行輕度聚合統計。

  2)掌握利用union操作實現多流的合並

  3)掌握窗口聚合操作

  4)掌握對clickhouse數據庫的寫入操作

  5)掌握用FlinkSQL實現業務

  6)掌握分詞器的使用

  7)掌握在FlinkSQL中自定義函數的使用

  8)截止至目前,所有的數據走向流程圖如下圖,高清版鏈接:https://www.processon.com/view/link/6111ce5a0e3e7407d391cbf5


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM