Filnk實時數倉(DWD層)


第1章 需求分析和實現思路

1.1 實時數倉分層

  在之前介紹實時數倉概念時討論過,建設實時數倉的目的,主要是增加數據計算的復用性。每次新增加統計需求時,不至於從原始數據進行計算,而是從半成品繼續加工而成。我們這里從kafka的ods層讀取用戶行為日志以及業務數據,並進行簡單處理,寫回到kafka作為dwd層

1.2 每層職能

分層

數據描述

生成計算工具

存儲媒介

ODS

原始數據,日志和業務數據

日志服務器,maxwell

kafka

DWD

根據數據對象為單位進行分流,比如訂單、頁面訪問等等。

FLINK

kafka

DWM

對於部分數據對象進行進一步加工,比如獨立訪問、跳出行為。依舊是明細數據。 進行了維度冗余(寬表)

FLINK

kafka

DIM

維度數據

FLINK

HBase

DWS

根據某個維度主題將多個事實數據輕度聚合,形成主題寬表。

FLINK

Clickhouse

ADS

把Clickhouse中的數據根據可視化需要進行篩選聚合。

Clickhouse SQL

可視化展示

第2章 Flink計算環境搭建

2.1 創建實時計算module

2.2 添加需要的依賴

<properties>
        <flink.version>1.13.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop.version>3.1.3</hadoop.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.16</version>
        </dependency>
        <!--commons-beanutils是Apache開源組織提供的用於操作JAVA BEAN的工具包。
使用commons-beanutils,我們可以很方便的對bean對象的屬性進行操作-->
        <dependency>
            <groupId>commons-beanutils</groupId>
            <artifactId>commons-beanutils</artifactId>
            <version>1.9.3</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.4.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers combine.children="append">
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2.3 添加log4j.properties

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.4 創建包結構

作用

app

產生各層數據的flink任務

bean

數據對象

common

公共常量

util

工具類

第3章 DWD: 用戶行為日志

  我們前面采集的日志數據已經保存到Kafka中,作為日志數據的ODS層,從kafka的ODS層讀取的日志數據分為3類: 頁面日志、啟動日志和曝光日志。這三類數據雖然都是用戶行為數據,但是有着完全不一樣的數據結構,所以要拆分處理。將拆分后的不同的日志寫回Kafka不同主題中,作為日志DWD層。

  頁面日志輸出到主流,啟動日志輸出到啟動側輸出流,曝光日志輸出到曝光側輸出流

3.1 日志格式

  1)頁面日志格式

{
    "common":{
        "ar":"110000",
        "ba":"Xiaomi",
        "ch":"xiaomi",
        "is_new":"0",
        "md":"Xiaomi 9",
        "mid":"mid_20",
        "os":"Android 11.0",
        "uid":"47",
        "vc":"v2.1.134"
    },
    "page":{
        "during_time":13968,
        "last_page_id":"home",
        "page_id":"search"
    },
    "ts":1614575952000
}

  2)啟動日志格式

{
    "common":{
        "ar":"110000",
        "ba":"iPhone",
        "ch":"Appstore",
        "is_new":"0",
        "md":"iPhone 8",
        "mid":"mid_19",
        "os":"iOS 13.3.1",
        "uid":"50",
        "vc":"v2.1.134"
    },
    "start":{
        "entry":"notice",
        "loading_time":9286,
        "open_ad_id":15,
        "open_ad_ms":6825,
        "open_ad_skip_ms":0
    },
    "ts":1614575950000
}

  3)曝光日志格式

{
    "common":{
        "ar":"110000",
        "ba":"iPhone",
        "ch":"Appstore",
        "is_new":"0",
        "md":"iPhone 8",
        "mid":"mid_19",
        "os":"iOS 13.3.1",
        "uid":"50",
        "vc":"v2.1.134"
    },
    "displays":[
        {
            "display_type":"activity",
            "item":"2",
            "item_type":"activity_id",
            "order":1,
            "pos_id":4
        },
        {
            "display_type":"activity",
            "item":"2",
            "item_type":"activity_id",
            "order":2,
            "pos_id":4
        },
        {
            "display_type":"promotion",
            "item":"4",
            "item_type":"sku_id",
            "order":3,
            "pos_id":5
        },
        {
            "display_type":"query",
            "item":"6",
            "item_type":"sku_id",
            "order":4,
            "pos_id":1
        },
        {
            "display_type":"promotion",
            "item":"3",
            "item_type":"sku_id",
            "order":5,
            "pos_id":5
        },
        {
            "display_type":"query",
            "item":"2",
            "item_type":"sku_id",
            "order":6,
            "pos_id":2
        },
        {
            "display_type":"query",
            "item":"7",
            "item_type":"sku_id",
            "order":7,
            "pos_id":3
        },
        {
            "display_type":"query",
            "item":"3",
            "item_type":"sku_id",
            "order":8,
            "pos_id":4
        },
        {
            "display_type":"query",
            "item":"9",
            "item_type":"sku_id",
            "order":9,
            "pos_id":1
        },
        {
            "display_type":"promotion",
            "item":"3",
            "item_type":"sku_id",
            "order":10,
            "pos_id":5
        },
        {
            "display_type":"query",
            "item":"8",
            "item_type":"sku_id",
            "order":11,
            "pos_id":2
        }
    ],
    "page":{
        "during_time":8319,
        "page_id":"home"
    },
    "ts":1614575950000
}

3.2 主要任務

  1)識別新老客戶

    本身客戶端業務有新老用戶的標識,但是不夠准確,需要用實時計算再次確認(不涉及業務操作,只是單純的做個狀態確認)。

  2)數據拆分

  3)不同數據寫入Kafka不同的Topic(dwd層數據)

3.3 具體實現代碼清單

3.3.1 封裝kafka工具類

  1)FlinkSourceUtil

package com.yuange.flinkrealtime.util;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * @作者:袁哥
 * @時間:2021/7/28 18:42
 */
public class FlinkSourceUtil {

    public static FlinkKafkaConsumer<String> getKafkaSource(String groupId, String topic){
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092");
        properties.setProperty("group.id", groupId);
        //如果啟動的時候, 這個消費者對這個topic的消費沒有上次的消費記錄, 就從這個配置的位置開始消費
        //如果有消費記錄, 則從上次的位置開始消費
        properties.setProperty("auto.offset.reset", "latest");
        properties.setProperty("isolation.level", "read_committed");

        return new FlinkKafkaConsumer<>(
                topic,
                new SimpleStringSchema(),
                properties
        );
    }
}

  2)FlinkSinkUtil

package com.yuange.flinkrealtime.util;

import lombok.SneakyThrows;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.util.Properties;

/**
 * @作者:袁哥
 * @時間:2021/7/28 18:33
 */
public class FlinkSinkUtil {

    public static FlinkKafkaProducer<String> getKafkaSink(final String topic){
        Properties conf = new Properties();
        conf.setProperty("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092");
        conf.setProperty("transaction.timeout.ms", 15 * 60 * 1000 + "");

        return new FlinkKafkaProducer<String>(
                "default",
                new KafkaSerializationSchema<String>() {
                    @SneakyThrows
                    public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
                        return new ProducerRecord<byte[], byte[]>(topic,null,s.getBytes("utf-8"));
                    }
                },
                conf,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );
    };
}

  3)YuangeCommonUtil

package com.yuange.flinkrealtime.util;

import java.util.ArrayList;
import java.util.List;

/**
 * @作者:袁哥
 * @時間:2021/7/28 18:48
 */
public class YuangeCommonUtil {

    public static<T> List<T> toList(Iterable<T> it){
        List<T> list = new ArrayList<>();
        for (T t : it) {
            list.add(t);
        }
        return list;
    }
}

3.3.2 封裝消費Kafka數據的BaseApp

package com.yuange.flinkrealtime.app;

import com.yuange.flinkrealtime.util.FlinkSourceUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @作者:袁哥
 * @時間:2021/7/28 18:53
 */
public abstract class BaseAppV1 {

    public void init(int port, int p, String ck, String groupId, String topic){
        System.setProperty("HADOOP_USER_NAME","atguigu");
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port",port);
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration).setParallelism(p);

        environment.enableCheckpointing(5000);  //檢查點之間的時間間隔,單位是毫秒
        environment.setStateBackend(new HashMapStateBackend()); //定義狀態后端,以保證將檢查點狀態寫入遠程(HDFS)
        environment.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop162:8020/FlinkParent/ck/" + ck);   //配置檢查點存放地址

        environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //設置檢查點模式:精准一次
        environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);   //設置檢查點失敗時重試次數
        environment.getCheckpointConfig()
                .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  //設置檢查點持久化:取消作業時保留外部化檢查點
        DataStreamSource<String> sourceStream = environment.addSource(FlinkSourceUtil.getKafkaSource(groupId, topic));

        run(environment,sourceStream);

        try {
            environment.execute(ck);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    protected abstract void run(StreamExecutionEnvironment environment, DataStreamSource<String> sourceStream);
}

3.3.3 常量

package com.yuange.flinkrealtime.common;

/**
 * @作者:袁哥
 * @時間:2021/7/28 19:31
 */
public class Constant {
    public static final String TOPIC_ODS_LOG = "ods_log";
    public static final String TOPIC_ODS_DB = "ods_db";
    public static final String TOPIC_DWD_START = "dwd_start";
    public static final String TOPIC_DWD_PAGE = "dwd_page";
    public static final String TOPIC_DWD_DISPLAY = "dwd_display";
}

3.3.4 DWDLogApp具體實現

3.3.4.1 識別新老訪客

  1)實現思路: 

    (1)考慮數據的亂序, 使用event-time語義

    (2)按照mid分組

    (3)添加5s的滾動窗口

    (4)使用狀態記錄首次訪問的時間戳

    (5)如果狀態為空, 則此窗口內的最小時間戳的事件為首次訪問, 其他均為非首次訪問

    (6)如果狀態不為空, 則此窗口內所有的事件均為非首次訪問

  2)實現代碼

package com.yuange.flinkrealtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV1;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.util.YuangeCommonUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Comparator;
import java.util.List;

/**
 * @作者:袁哥
 * @時間:2021/7/28 19:32
 */
public class DwdLogApp extends BaseAppV1 {
    final String START_STREAM = "start";
    final String PAGE_STREAM = "page";
    final String DISPLAY_STREAM = "display";

    public static void main(String[] args) {
        new DwdLogApp().init(
                2001,              //端口號
                2,                   //並行度
                "DwdLogApp",        //檢查點的存放目錄名稱
                "DwdLogApp",   //消費者組id
                Constant.TOPIC_ODS_LOG  //主題
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> sourceStream) {
        //業務
        //一、對新老用戶進行確認
        SingleOutputStreamOperator<JSONObject> validatedStream = distinguishNewOrOld(sourceStream);
        validatedStream.print();
    }

    private SingleOutputStreamOperator<JSONObject> distinguishNewOrOld(DataStreamSource<String> sourceStream) {
        /**
         * 如何實現識別新老客戶?
         * 需要利用狀態
         * 考慮數據的亂序: 使用事件時間, 加窗口
         * */
        //創建一個新的WatermarkStrategy來封裝水印策略
        return sourceStream
                .map(JSON::parseObject) //將數據轉為JSON格式
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3))    //創建水印策略,水印是周期性生成的。這種水印策略引入的延遲是周期間隔長度加上亂序界限
                                .withTimestampAssigner((element, recordTimestamp) -> element.getLong("ts"))     //ts + 3秒
                )
                .keyBy(obj -> obj.getJSONObject("common").getString("mid")) //以設備id分組
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))   //滾動事件窗口,每5秒統計一次數據
                .process(new ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>() {
                    //定義一個狀態,以此來判斷用戶是否是新老用戶
                    ValueState<Long> firstWindowState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        firstWindowState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("firstWindowState", Long.class));
                    }

                    @Override
                    public void process(String key,
                                        Context context,
                                        Iterable<JSONObject> elements,
                                        Collector<JSONObject> out) throws Exception {
                        //如果識別出來某個mid的第一個窗口
                        if (firstWindowState.value() == null){
                            //把時間戳最小的那個條記錄的is_new設置為1, 其他都為0
                            List<JSONObject> list = YuangeCommonUtil.toList(elements);
                            list.sort(Comparator.comparing(o -> o.getLong("ts")));  //將JSON數據按ts排序

                            for (int i = 0; i < list.size(); i++) {
                                JSONObject common = list.get(i).getJSONObject("common");
                                if (i == 0){
                                    common.put("is_new","1");   //設置is_new為1,表示它是新用戶
                                    firstWindowState.update(list.get(i).getLong("ts")); //更新狀態
                                }else {
                                    common.put("is_new","0");   //設置is_new為0,表示它是老用戶
                                }
                                out.collect(list.get(i));   //將處理好的數據寫出到流中
                            }
                        }else {
                            //所有的用戶都是舊用戶, 所有的is_new全部設置為0
                            for (JSONObject element : elements) {
                                element.getJSONObject("common").put("is_new","0");
                                out.collect(element);   //將處理好的數據寫出到流中
                            }
                        }
                    }
                });
    }
}

  3)啟動DwdLogApp

  4)生產日志數據(在此之前必須啟動nginx、hadoop、zk、kafka、日志服務器)

cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

  5)查看控制台,發現有數據

3.3.4.2 數據分流

  根據日志數據內容,將日志數據分為3類: 頁面日志、啟動日志和曝光日志。頁面日志輸出到主流,啟動日志輸出到啟動側輸出流曝光日志輸出到曝光日志側輸出流。

  1)具體寫入代碼

package com.yuange.flinkrealtime.app.dwd;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONAware;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV1;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.util.FlinkSinkUtil;
import com.yuange.flinkrealtime.util.YuangeCommonUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;

/**
 * @作者:袁哥
 * @時間:2021/7/28 19:32
 * 對日志數據進行分流, 寫入到dwd層(kafka)
 * 1. 對新老用戶進行確認
 * 2. 對ods_log流進行分流(使用側輸出流)
 *     不同的數據放入不同的流
 *         啟動日志
 *         曝光日志
 *         頁面日志
 * 3. 把數據寫入到Kafka中
 */
public class DwdLogApp extends BaseAppV1 {
    final String START_STREAM = "start";
    final String PAGE_STREAM = "page";
    final String DISPLAY_STREAM = "display";

    public static void main(String[] args) {
        new DwdLogApp().init(
                2001,              //端口號
                2,                   //並行度
                "DwdLogApp",        //檢查點的存放目錄名稱
                "DwdLogApp",   //消費者組id
                Constant.TOPIC_ODS_LOG  //主題
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> sourceStream) {
        //業務
        //一、對新老用戶進行確認
        SingleOutputStreamOperator<JSONObject> validatedStream = distinguishNewOrOld(sourceStream);
//        validatedStream.print();
        //二、讓不同的日志進入不同的流
        HashMap<String, DataStream<JSONObject>> threeStreams = splitSteam(validatedStream);
        //三、把數據寫入到kafka中
        sendToKafka(threeStreams);
    }

    private void sendToKafka(HashMap<String, DataStream<JSONObject>> threeStreams) {
        threeStreams
                .get(START_STREAM)
                .map(JSONAware::toJSONString)
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_START));

        threeStreams
                .get(PAGE_STREAM)
                .map(JSONAware::toJSONString)
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_PAGE));

        threeStreams
                .get(DISPLAY_STREAM)
                .map(JSONAware::toJSONString)
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_DISPLAY));
    }

    private HashMap<String, DataStream<JSONObject>> splitSteam(SingleOutputStreamOperator<JSONObject> validatedStream) {
        /**
         * 把日志分成了3類:
         *      1. 啟動日志  主流
         *      2. 頁面日志   側輸出流
         *      3. 曝光日志   側輸出流
         * */
        OutputTag<JSONObject> pageTag = new OutputTag<JSONObject>("page") { //page側輸出流
        };
        OutputTag<JSONObject> displayTag = new OutputTag<JSONObject>("display") {   //display測輸出流
        };
        SingleOutputStreamOperator<JSONObject> startStream = validatedStream.process(new ProcessFunction<JSONObject, JSONObject>() {
            @Override
            public void processElement(JSONObject value,
                                       Context ctx,
                                       Collector<JSONObject> out) throws Exception {
                JSONObject start = value.getJSONObject("start");    //獲取start日志
                if (start != null) { //啟動日志放入主流
                    out.collect(value);
                } else {
                    JSONObject page = value.getJSONObject("page");
                    if (page != null) {
                        ctx.output(pageTag, value);  //將數據存儲pageTag側輸出流
                    }

                    JSONArray displays = value.getJSONArray("displays");
                    if (displays != null) {
                        for (int i = 0; i < displays.size(); i++) {
                            JSONObject display = displays.getJSONObject(i);
                            // 在display中補充一些數據
                            // 1. 補充時間戳
                            display.put("ts", value.getLong("ts"));
                            // 2. 補充一個page_id
                            display.put("page_id", value.getJSONObject("page").getString("page_id"));
                            // 3. 補充common中所有的字段
                            display.putAll(value.getJSONObject("common"));
                            ctx.output(displayTag, display); //將處理好的數據存入display側輸出流
                        }
                    }
                }

            }
        });

        //將側輸出流轉化為DataStream
        DataStream<JSONObject> pageStream = startStream.getSideOutput(pageTag);
        DataStream<JSONObject> displayStream = startStream.getSideOutput(displayTag);
        //將流匯總到Map集合中
        HashMap<String, DataStream<JSONObject>> map = new HashMap<>();
        map.put(START_STREAM,startStream);
        map.put(PAGE_STREAM,pageStream);
        map.put(DISPLAY_STREAM,displayStream);
        return map;
    }

    private SingleOutputStreamOperator<JSONObject> distinguishNewOrOld(DataStreamSource<String> sourceStream) {
        /**
         * 如何實現識別新老客戶?
         * 需要利用狀態
         * 考慮數據的亂序: 使用事件時間, 加窗口
         * */
        //創建一個新的WatermarkStrategy來封裝水印策略
        return sourceStream
                .map(JSON::parseObject) //將數據轉為JSON格式
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3))    //創建水印策略,水印是周期性生成的。這種水印策略引入的延遲是周期間隔長度加上亂序界限
                                .withTimestampAssigner((element, recordTimestamp) -> element.getLong("ts"))     //ts + 3秒
                )
                .keyBy(obj -> obj.getJSONObject("common").getString("mid")) //以設備id分組
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))   //滾動事件窗口,每5秒統計一次數據
                .process(new ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>() {
                    //定義一個狀態,以此來判斷用戶是否是新老用戶
                    ValueState<Long> firstWindowState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        firstWindowState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("firstWindowState", Long.class));
                    }

                    @Override
                    public void process(String key,
                                        Context context,
                                        Iterable<JSONObject> elements,
                                        Collector<JSONObject> out) throws Exception {
                        //如果識別出來某個mid的第一個窗口
                        if (firstWindowState.value() == null){
                            //把時間戳最小的那個條記錄的is_new設置為1, 其他都為0
                            List<JSONObject> list = YuangeCommonUtil.toList(elements);
                            list.sort(Comparator.comparing(o -> o.getLong("ts")));  //將JSON數據按ts排序

                            for (int i = 0; i < list.size(); i++) {
                                JSONObject common = list.get(i).getJSONObject("common");
                                if (i == 0){
                                    common.put("is_new","1");   //設置is_new為1,表示它是新用戶
                                    firstWindowState.update(list.get(i).getLong("ts")); //更新狀態
                                }else {
                                    common.put("is_new","0");   //設置is_new為0,表示它是老用戶
                                }
                                out.collect(list.get(i));   //將處理好的數據寫出到流中
                            }
                        }else {
                            //所有的用戶都是舊用戶, 所有的is_new全部設置為0
                            for (JSONObject element : elements) {
                                element.getJSONObject("common").put("is_new","0");
                                out.collect(element);   //將處理好的數據寫出到流中
                            }
                        }
                    }
                });
    }
}

  2)生產數據

cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

  3)啟動kafka消費數據

/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop162:9092 --topic dwd_start
/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop162:9092 --topic dwd_page
/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop162:9092 --topic dwd_display

  4)查看消費情況即可

第4章 DWD: 業務數據

  業務數據的變化,我們可以通過Maxwell采集到,但是MaxWell是把全部數據統一寫入一個Topic中, 這些數據包括業務數據,也包含維度數據,這樣顯然不利於日后的數據處理,所以這個功能是從Kafka的業務數據ODS層讀取數據,經過處理后,將維度數據保存到Hbase,將事實數據寫回Kafka作為業務數據的DWD層

4.1 主要任務

4.1.1 接收Kafka數據,過濾空值數據

4.1.2 實現動態分流功能

  由於MaxWell是把全部數據統一寫入一個Topic中, 這樣顯然不利於日后的數據處理。所以需要把各個表拆開處理。但是由於每個表有不同的特點,有些表是維度表,有些表是事實表,有的表既是事實表在某種情況下也是維度表。在實時計算中一般把維度數據寫入存儲容器,一般是方便通過主鍵查詢的數據庫比如HBase,Redis,MySQL等。

  一般把事實數據寫入流中,進行進一步處理,最終形成寬表。但是作為Flink實時計算任務,如何得知哪些表是維度表,哪些是事實表呢?而這些表又應該采集哪些字段呢?

  這樣的配置不適合寫在配置文件中,因為這樣的話,業務端隨着需求變化每增加一張表,就要修改配置重啟計算程序。所以這里需要一種動態配置方案,把這種配置長期保存起來,一旦配置有變化,實時計算可以自動感知

  這種可以有兩個方案實現:一種是用Zookeeper存儲,通過Watch感知數據變化。另一種是用mysql數據庫存儲。

  這里選擇第二種方案,主要是mysql對於配置數據初始化和維護管理,用sql都比較方便,雖然周期性操作時效性差一點,但是配置變化並不頻繁。所以就有了如下圖:

4.1.3 把分好的流保存到對應表、主題中

  業務數據保存到Kafka的主題中,維度數據保存到Hbase的表中

4.2 具體實現代碼

4.2.1 設計動態配置表

  1)創建動態配置表並初始化數據

CREATE DATABASE `flink_realtime` CHARACTER SET utf8 COLLATE utf8_general_ci;

USE flink_realtime;

source /opt/software/mock/mock_db/table_process_init.sql;

  2)配置表實體類

package com.yuange.flinkrealtime.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @作者:袁哥
 * @時間:2021/7/29 11:10
 */
@AllArgsConstructor
@NoArgsConstructor
@Data
public class TableProcess {
    //動態分流Sink常量
    public static final String SINK_TYPE_HBASE = "hbase";
    public static final String SINK_TYPE_KAFKA = "kafka";
    public static final String SINK_TYPE_CK = "clickhouse";
    //來源表
    private String source_table;
    //操作類型 insert,update,delete
    private String operate_type;
    //輸出類型 hbase kafka
    private String sink_type;
    //輸出表(主題)
    private String sink_table;
    //輸出字段
    private String sink_columns;
    //主鍵字段
    private String sink_pk;
    //建表擴展
    private String sink_extend;
}

4.2.2 實現思路

  1)業務數據: mysql->maxwell->kafka->flink

  2)動態表配置表的數據: msyql->flink-sql-cdc

  3)把動態表配置表做成廣播流與業務數據進行connect, 從而實現動態控制業務數據的sink方向

4.2.3 讀取動態配置表

  1)Flink SQL CDC 介紹

    CDC 全稱是 Change Data Capture ,它是一個比較廣義的概念,只要能捕獲變更的數據,我們都可以稱為 CDC 。業界主要有基於查詢的 CDC 和基於日志的 CDC ,可以從下面表格對比他們功能和差異點。

  2)傳統的數據同步場景(咱們前面用的場景):

    缺點: 采集端組件過多導致維護繁雜

    改進后的架構:

    Flink社區開發了 flink-cdc-connectors 組件,這是一個可以直接從 MySQL、PostgreSQL 等數據庫直接讀取全量數據和增量變更數據的 source 組件。

    目前也已開源,開源地址:https://github.com/ververica/flink-cdc-connectors

  3)修改mysql配置,增加對數據庫flink_realtime監控

    (1)修改配置文件

sudo vim /etc/my.cnf
[mysqld]
server-id= 1
#日志前綴
log-bin=mysql-bin
##同步策略
binlog_format=row
##同步的庫
binlog-do-db=flinkdb
binlog-do-db=flink_realtime

    (2)需要重啟mysql數據庫

sudo systemctl restart mysqld

    (3)確認msyql有沒有啟動成功

sudo systemctl status mysqld
#或者
ps -ef | grep mysqld

    (4)注意maxwell不要再采集這個數據庫的數據,在maxwell的配置中添加如下配置

vim /opt/module/maxwell-1.27.1/config.properties
filter=exclude:flink_realtime.*

  4)導入CDC依賴

<!-- https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc -->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.4.0</version>
</dependency>

  5)具體實現代碼

package com.yuange.flinkrealtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV1;
import com.yuange.flinkrealtime.bean.TableProcess;
import com.yuange.flinkrealtime.common.Constant;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @作者:袁哥
 * @時間:2021/7/29 11:33
 * 業務數據使用maxwell采集到kafka形成ods層的業務數據,然后再由Flink來接收處理ods層的數據
 * 配置數據使用FlinkCDC直接從MySQL中讀取,並且進行實時監控,只要配置表中的數據發生變動,FlinkCDC會感知到並進行相應的處理
 */
public class DwdDbApp extends BaseAppV1 {

    public static void main(String[] args) {
        new DwdDbApp().init(
                2002,               //端口號
                1,                    //並行度
                "DwdDbApp",          //檢查點存放在HDFS上的目錄名稱
                "DwdDbApp",     //消費者組
                Constant.TOPIC_ODS_DB    //消費的主題
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> dataStream) {
        //讀取配置表的數據, 得到一個配置流(cdc)
        SingleOutputStreamOperator<TableProcess> tableProcessStream = readTableProcess(environment);
    }

    private SingleOutputStreamOperator<TableProcess> readTableProcess(StreamExecutionEnvironment environment) {
        /**
         * 第一次讀取全部數據
         * 以后監控mysql中這個配置表的數據的更新
         * */
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        tableEnvironment.executeSql("create table `table_process` (\n" +
                " `source_table` string,\n" +
                " `operate_type` string,\n" +
                " `sink_type` string,\n" +
                " `sink_table` string,\n" +
                " `sink_columns` string,\n" +
                " `sink_pk` string,\n" +
                " `sink_extend` string,\n" +
                " primary key (`source_table`,`operate_type`) not enforced\n" +
                ")with(" +
                " 'connector' = 'mysql-cdc', " +
                " 'hostname' = 'hadoop162', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = 'aaaaaa', " +
                " 'database-name' = 'flink_realtime', " +
                " 'table-name' = 'table_process', " +
                " 'debezium.snapshot.mode' = 'initial', " +
                ")"
        );
        /**
         * initial: 啟動的時候會讀取表中所有的數據, 放在內存中, 全部數據讀取完成之后, 會使用binlog來監控mysql的變化
         * never: 只用binlog來監控mysql的變化
         */
        Table table_process = tableEnvironment.from("table_process");

        return tableEnvironment
                .toRetractStream(table_process, TableProcess.class) //將table轉化為可以新增和變化的dataStream
                .filter(t -> t.f0)      //過濾出變化的數據
                .map(t -> t.f1);        //返回數據:TableProcess
    }
}

4.2.4 讀取業務數據並ETL

package com.yuange.flinkrealtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV1;
import com.yuange.flinkrealtime.bean.TableProcess;
import com.yuange.flinkrealtime.common.Constant;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @作者:袁哥
 * @時間:2021/7/29 11:33
 * 業務數據使用maxwell采集到kafka形成ods層的業務數據,然后再由Flink來接收處理ods層的數據
 * 配置數據使用FlinkCDC直接從MySQL中讀取,並且進行實時監控,只要配置表中的數據發生變動,FlinkCDC會感知到並進行相應的處理
 */
public class DwdDbApp extends BaseAppV1 {

    public static void main(String[] args) {
        new DwdDbApp().init(
                2002,               //端口號
                1,                    //並行度
                "DwdDbApp",          //檢查點存放在HDFS上的目錄名稱
                "DwdDbApp",     //消費者組
                Constant.TOPIC_ODS_DB    //消費的主題
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> dataStream) {
        //1. 對數據進行etl
        SingleOutputStreamOperator<JSONObject> etledStream = etlDataStream(dataStream);
        //2. 讀取配置表的數據, 得到一個配置流(cdc)
        SingleOutputStreamOperator<TableProcess> tableProcessStream = readTableProcess(environment);
    }

    private SingleOutputStreamOperator<TableProcess> readTableProcess(StreamExecutionEnvironment environment) {
        /**
         * 第一次讀取全部數據
         * 以后監控mysql中這個配置表的數據的更新
         * */
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        tableEnvironment.executeSql("create table `table_process` (\n" +
                " `source_table` string,\n" +
                " `operate_type` string,\n" +
                " `sink_type` string,\n" +
                " `sink_table` string,\n" +
                " `sink_columns` string,\n" +
                " `sink_pk` string,\n" +
                " `sink_extend` string,\n" +
                " primary key (`source_table`,`operate_type`) not enforced\n" +
                ")with(" +
                " 'connector' = 'mysql-cdc', " +
                " 'hostname' = 'hadoop162', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = 'aaaaaa', " +
                " 'database-name' = 'flink_realtime', " +
                " 'table-name' = 'table_process', " +
                " 'debezium.snapshot.mode' = 'initial', " +
                ")"
        );
        /**
         * initial: 啟動的時候會讀取表中所有的數據, 放在內存中, 全部數據讀取完成之后, 會使用binlog來監控mysql的變化
         * never: 只用binlog來監控mysql的變化
         */
        Table table_process = tableEnvironment.from("table_process");

        return tableEnvironment
                .toRetractStream(table_process, TableProcess.class) //將table轉化為可以新增和變化的dataStream
                .filter(t -> t.f0)      //過濾出變化的數據
                .map(t -> t.f1);        //返回數據:TableProcess
    }

    private SingleOutputStreamOperator<JSONObject> etlDataStream(DataStreamSource<String> dataStream) {
        return dataStream
                .map(JSON::parseObject)  //將流中的數據轉為JSON格式
                .filter(obj ->
                        obj.getString("database") != null
                        && obj.getString("table") != null
                        && obj.getString("type") != null
                        && (obj.getString("type").contains("insert") || "update".equals(obj.getString("type")))
                        && obj.getString("data") != null
                        && obj.getString("data").length() > 10
                );
    }
}

4.2.5 業務數據表和動態配置表connect

  1)把動態配置表做成廣播流, 和數據表流進行connect, 然后進行數據的分流: 事實表數據在主流, hbase數據在側輸出流

package com.yuange.flinkrealtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV1;
import com.yuange.flinkrealtime.bean.TableProcess;
import com.yuange.flinkrealtime.common.Constant;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.Arrays;
import java.util.List;

/**
 * @作者:袁哥
 * @時間:2021/7/29 11:33
 * 業務數據使用maxwell采集到kafka形成ods層的業務數據,然后再由Flink來接收處理ods層的數據
 * 配置數據使用FlinkCDC直接從MySQL中讀取,並且進行實時監控,只要配置表中的數據發生變動,FlinkCDC會感知到並進行相應的處理
 */
public class DwdDbApp extends BaseAppV1 {

    public static void main(String[] args) {
        new DwdDbApp().init(
                2002,               //端口號
                1,                    //並行度
                "DwdDbApp",          //檢查點存放在HDFS上的目錄名稱
                "DwdDbApp",     //消費者組
                Constant.TOPIC_ODS_DB    //消費的主題
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> dataStream) {
        //1. 對數據進行etl
        SingleOutputStreamOperator<JSONObject> etledStream = etlDataStream(dataStream);
        //2. 讀取配置表的數據, 得到一個配置流(cdc)
        SingleOutputStreamOperator<TableProcess> tableProcessStream = readTableProcess(environment);
        //3. 數據流和配置流進行connect,返回值就是:一個JSONObject數據對應一個TableProcess配置
        SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectedStreams = connectStreams(etledStream, tableProcessStream);
        //4.每條數據根據他的配置, 進行動態分流
        Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> kafkaHbaseStreams = dynamicSplit(connectedStreams);
        kafkaHbaseStreams.f0.print("kafka");
        kafkaHbaseStreams.f1.print("hbase");
    }

    private Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> dynamicSplit(SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectedStreams) {
        //側輸出流
        OutputTag<Tuple2<JSONObject, TableProcess>> hbaseTag = new OutputTag<Tuple2<JSONObject, TableProcess>>("hbase") {
        };

        SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> kafkaStream = connectedStreams.process(new ProcessFunction<Tuple2<JSONObject, TableProcess>, Tuple2<JSONObject, TableProcess>>() {
            @Override
            public void processElement(Tuple2<JSONObject, TableProcess> value,
                                       Context ctx,
                                       Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception {
                //只取出JSONObject中的data數據,相當於做了一次過濾,我們只需要data數據
                Tuple2<JSONObject, TableProcess> data = Tuple2.of(value.f0.getJSONObject("data"), value.f1);
                //其實這個地方應該根據sink_cloumns的值進行一個過濾, 只保留需要sink的字段
                filterColumns(data);
                /**
                 * 從TableProcess配置中獲取每條數據應該去往哪里:getSink_type
                 * hbase
                 * kafka
                 * */
                String sink_type = value.f1.getSink_type();
                if (TableProcess.SINK_TYPE_KAFKA.equals(sink_type)) { //如果這條數據中的配置顯示是kafka,則將數據發往kafka(使用主流)
                    //事實數據較多,使用主流發往kafka
                    out.collect(data);
                } else if (TableProcess.SINK_TYPE_HBASE.equals(sink_type)) { //如果這條數據中的配置顯示是hbase,則將數據發往hbase(使用側輸出流)
                    //因為維度數據較少,故使用側輸出流發往hbase
                    ctx.output(hbaseTag, data);
                }
            }

            private void filterColumns(Tuple2<JSONObject, TableProcess> data) {
                JSONObject jsonObject = data.f0;
                //將配置表中的配個字段切分開來,放到一個List集合中
                /*
                id,activity_name,activity_type,activity_desc,start_time,end_time,create_time
                id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level
                */
                List<String> columns = Arrays.asList(data.f1.getSink_columns().split(","));
                //如果columns集合中沒有對應的key值,那么JSONObject中的這條數據就刪除它
                jsonObject.keySet().removeIf(key -> !columns.contains(key));
            }
        });
        //將側輸出流轉換為DataStream
        DataStream<Tuple2<JSONObject, TableProcess>> hbaseStream = kafkaStream.getSideOutput(hbaseTag);
        return Tuple2.of(kafkaStream,hbaseStream);
    }

    private SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectStreams(SingleOutputStreamOperator<JSONObject> etledStream,
                                                                                        SingleOutputStreamOperator<TableProcess> tableProcessStream) {
        MapStateDescriptor<String, TableProcess> tpStateDesc = new MapStateDescriptor<>("tpStateDesc", String.class, TableProcess.class);
        /*
            動態分流
                目標: 應該得到一個新的流, 新的流存儲的數據類型應該是一個二維元組
                <JSONObject, TableProcess>
            碰到一條數據流中的數據, 找一個TableProcess
            key: source_table:operate_type
            value: TableProcess
         */
        //1.將配置流做成廣播流
        BroadcastStream<TableProcess> tpBroadcastStream = tableProcessStream.broadcast(tpStateDesc);
        //2.廣播流與數據流進行connect
        return etledStream.keyBy(obj -> obj.getString("table"))    //以table分組,然后將每個table與廣播流connect
                .connect(tpBroadcastStream)
                .process(new KeyedBroadcastProcessFunction<String, JSONObject, TableProcess, Tuple2<JSONObject,TableProcess>>() {
                    @Override
                    public void processElement(JSONObject value,
                                               ReadOnlyContext ctx,
                                               Collector<Tuple2<JSONObject,TableProcess>> out) throws Exception {
                        //獲取廣播狀態,因為所有的配置信息都保存在了廣播狀態中
                        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(tpStateDesc);
                        //拼接table:type作為key值
                        String key = value.getString("table") + ":" + value.getString("type").replaceAll("bootstrap-", "");
                        //根據在數據流中拼接出來的key值,從廣播狀態中取出對應的配置信息(即給每一條數據找一個TableProcess配置)
                        TableProcess tableProcess = broadcastState.get(key);
                        //如果tableProcess是null,證明這條數據不需要后面處理
                        if (tableProcess != null){
                            out.collect(Tuple2.of(value,tableProcess)); //將Json數據和配置數據返回(一個table對應一個配置信息)
                        }

                    }

                    @Override
                    public void processBroadcastElement(TableProcess value,
                                                        Context ctx,
                                                        Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception {
                        //把來的每條配置都寫入到廣播狀態中
                        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(tpStateDesc);   //從上下文環境中獲取廣播狀態
                        //拼接key,以保存到廣播狀態中
                        /*
                        Source_table    Operate_type
                        activity_info    insert
                        activity_info    update
                        activity_rule    insert
                        activity_rule    update
                        activity_sku    insert
                        activity_sku    update
                        */
                        String key = value.getSource_table() + ":" + value.getOperate_type();
                        //一條記錄就是一個配置信息
                        broadcastState.put(key,value);
                    }
                });

    }

    private SingleOutputStreamOperator<TableProcess> readTableProcess(StreamExecutionEnvironment environment) {
        /**
         * 第一次讀取全部數據
         * 以后監控mysql中這個配置表的數據的更新
         * */
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        tableEnvironment.executeSql("create table `table_process` (\n" +
                " `source_table` string,\n" +
                " `operate_type` string,\n" +
                " `sink_type` string,\n" +
                " `sink_table` string,\n" +
                " `sink_columns` string,\n" +
                " `sink_pk` string,\n" +
                " `sink_extend` string,\n" +
                " primary key (`source_table`,`operate_type`) not enforced\n" +
                ")with(" +
                " 'connector' = 'mysql-cdc', " +
                " 'hostname' = 'hadoop162', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = 'aaaaaa', " +
                " 'database-name' = 'flink_realtime', " +
                " 'table-name' = 'table_process', " +
                " 'debezium.snapshot.mode' = 'initial' " +
                ")"
        );
        /**
         * initial: 啟動的時候會讀取表中所有的數據, 放在內存中, 全部數據讀取完成之后, 會使用binlog來監控mysql的變化
         * never: 只用binlog來監控mysql的變化
         */
        Table table_process = tableEnvironment.from("table_process");

        return tableEnvironment
                .toRetractStream(table_process, TableProcess.class) //將table轉化為可以新增和變化的dataStream
                .filter(t -> t.f0)      //過濾出變化的數據
                .map(t -> t.f1);        //返回數據:TableProcess
    }

    private SingleOutputStreamOperator<JSONObject> etlDataStream(DataStreamSource<String> dataStream) {
        return dataStream
                .map(JSON::parseObject)  //將流中的數據轉為JSON格式
                .filter(obj ->
                        obj.getString("database") != null
                        && obj.getString("table") != null
                        && obj.getString("type") != null
                        && (obj.getString("type").contains("insert") || "update".equals(obj.getString("type")))
                        && obj.getString("data") != null
                        && obj.getString("data").length() > 10
                );
    }
}

  2)啟動Hadoop(checkpoint持久化)

hadoop.sh start

  3)啟動ZK

zk start

  4)啟動Kafka

kafka.sh start

  5)啟動DwdDbApp,准備接收Kafka中的數據

  6)啟動Maxwell,實時監控Mysql中業務數據的變化,並將業務數據導入Kafka中(也可以將舊的數據導入Kafka,使用maxwell-bootstrap即可)

/opt/module/maxwell-1.27.1/bin/maxwell --config /opt/module/maxwell-1.27.1/config.properties --daemon
/opt/module/maxwell-1.27.1/bin/maxwell-bootstrap --user maxwell --password aaaaaa --host hadoop162 --database flinkdb --table user_info --client_id maxwell_1

  7)查看數據,發現控制台有打印(數據太多會把kafka主流的數據沖掉,因為控制台長度有限)

  8)生產業務數據,模擬新增

cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar

4.2.6 數據sink到正確的位置

  1)Sink到Hbase

    (1)導入Phoenix相關依賴

<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-core</artifactId>
    <version>5.0.0-HBase-2.0</version>

    <exclusions>
        <exclusion>
            <groupId>org.glassfish</groupId>
            <artifactId>javax.el</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>30.1-jre</version>
</dependency>
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>2.5.0</version>
</dependency>

    (2)在Constant中新增兩個常量

public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";    //CTRL + N:全局搜索
public static final String PHOENIX_URL = "jdbc:phoenix:hadoop162,hadoop163,hadoop164:2181";

    (3)新建一個PhoenixSink,讓它繼承RichSinkFunction,將數據寫入HBase

package com.yuange.flinkrealtime.sink;

import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.bean.TableProcess;
import com.yuange.flinkrealtime.util.JdbcUtil;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @作者:袁哥
 * @時間:2021/7/30 23:25
 */
public class PhoenixSink extends RichSinkFunction<Tuple2<JSONObject, TableProcess>> {
    Connection conn;
    ValueState<String> tableCreateState;
    @Override
    public void open(Configuration parameters) throws Exception {
        //先加載驅動, 很多情況不是必須.
        //大部分常用的數據庫會根據url自動選擇合適的driver
        //Phoenix 驅動有些時候需要手動加載一下
        conn = JdbcUtil.getPhoenixConnection();
        //創建一個狀態來管理table
        tableCreateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("tableCreateState", String.class));
    }

    @Override
    public void invoke(Tuple2<JSONObject, TableProcess> value, Context context) throws Exception {
        // 1. 檢測表, 如果表不存在就需要在Phoenix中新建表
        checkTable(value);
        // 2. 再把數據寫入到phoenix中
        writeToPhoenix(value);
    }

    private void writeToPhoenix(Tuple2<JSONObject, TableProcess> value) throws SQLException {
        JSONObject data = value.f0;
        TableProcess tp = value.f1;

        // upsert  into user(id, name, age) values(?,?,?)
        //拼接SQL語句
        StringBuilder insertSql = new StringBuilder();
        insertSql
                .append("upsert into ")
                .append(tp.getSink_table())
                .append("(")
                //id,activity_name,activity_type,activity_desc,start_time,end_time,create_time
                .append(tp.getSink_columns())
                .append(")values(")
                //把非,部分替換為?
                .append(tp.getSink_columns().replaceAll("[^,]+","?"))
                .append(")");
        PreparedStatement ps = conn.prepareStatement(insertSql.toString());
        //給ps中的占位符賦值
        String[] columnNames = tp.getSink_columns().split(",");
        for (int i = 0; i < columnNames.length; i++) {
            //從JSONObject數據中取出對應字段的值
            Object str = data.getString(columnNames[i]);
            ps.setString(i + 1,str == null ? "" : str.toString());
        }

        ps.execute();
        conn.commit();
        ps.close();
    }

    private void checkTable(Tuple2<JSONObject, TableProcess> value) throws IOException, SQLException {
        if (tableCreateState.value() == null){
            // 執行sql語句   create table if not exists user(id varchar, age varchar )
            TableProcess tp = value.f1;
            // 拼接sql語句
            StringBuilder createSql = new StringBuilder();
            createSql
                    .append("create table if not exists ")
                    .append(tp.getSink_table())
                    .append("(")
                    .append(tp.getSink_columns().replaceAll(","," varchar,"))
                    .append(" varchar, constraint pk primary key(")
                    .append(tp.getSink_pk() == null ? "id" : tp.getSink_pk())
                    .append("))")
                    .append(tp.getSink_extend() == null ? "" : tp.getSink_extend());

            PreparedStatement ps = conn.prepareStatement(createSql.toString());
            ps.execute();
            conn.commit();
            ps.close();
            //更新狀態
            tableCreateState.update(tp.getSink_table());
        }
    }
}

    (4)新建JDBCUtil,獲取Phoenix連接

package com.yuange.flinkrealtime.util;

import com.yuange.flinkrealtime.common.Constant;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

/**
 * @作者:袁哥
 * @時間:2021/7/30 23:30
 */
public class JdbcUtil  {
    public static Connection getPhoenixConnection() throws ClassNotFoundException, SQLException {
        Class.forName(Constant.PHOENIX_DRIVER);
        return DriverManager.getConnection(Constant.PHOENIX_URL);
    }
}

    (5)在FlinkSinkUtil中添加getHbaseSink()方法,返回值就是一個SinkFunction,而我們新建的PhoenixSink繼承了RichSinkFunction,RichSinkFunction又實現了SinkFunction,所以可以直接將PhoenixSink返回即可

public static SinkFunction<Tuple2<JSONObject, TableProcess>> getHbaseSink(){
        return new PhoenixSink();
    }

    (6)DwdDbApp中調用FlinkSinkUtil.getHbaseSink()即可把數據寫入hbase

package com.yuange.flinkrealtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV1;
import com.yuange.flinkrealtime.bean.TableProcess;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.util.FlinkSinkUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.Arrays;
import java.util.List;

/**
 * @作者:袁哥
 * @時間:2021/7/29 11:33
 * 業務數據使用maxwell采集到kafka形成ods層的業務數據,然后再由Flink來接收處理ods層的數據
 * 配置數據使用FlinkCDC直接從MySQL中讀取,並且進行實時監控,只要配置表中的數據發生變動,FlinkCDC會感知到並進行相應的處理
 */
public class DwdDbApp extends BaseAppV1 {

    public static void main(String[] args) {
        new DwdDbApp().init(
                2002,               //端口號
                1,                    //並行度
                "DwdDbApp",          //檢查點存放在HDFS上的目錄名稱
                "DwdDbApp",     //消費者組
                Constant.TOPIC_ODS_DB    //消費的主題
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> dataStream) {
        //1. 對數據進行etl
        SingleOutputStreamOperator<JSONObject> etledStream = etlDataStream(dataStream);
        //2. 讀取配置表的數據, 得到一個配置流(cdc)
        SingleOutputStreamOperator<TableProcess> tableProcessStream = readTableProcess(environment);
        //3. 數據流和配置流進行connect,返回值就是:一個JSONObject數據對應一個TableProcess配置
        SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectedStreams = connectStreams(etledStream, tableProcessStream);
        //4.每條數據根據他的配置, 進行動態分流
        Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> kafkaHbaseStreams = dynamicSplit(connectedStreams);
//        kafkaHbaseStreams.f0.print("kafka");
//        kafkaHbaseStreams.f1.print("hbase");
        //5.維度表寫入到hbase
        sendToHbase(kafkaHbaseStreams.f1);
    }

    private void sendToHbase(DataStream<Tuple2<JSONObject, TableProcess>> stream) {
        /**
         * 向hbase(Phoenix)寫入數據的時候, 表不會自動創建
         * 1. 先創建表 動態創建
         * 2. 再寫入
         * */
        stream.keyBy(t -> t.f1.getSink_table()) //按照配置表中的Sink_table分組
                .addSink(FlinkSinkUtil.getHbaseSink());
    }

    private Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> dynamicSplit(SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectedStreams) {
        //側輸出流
        OutputTag<Tuple2<JSONObject, TableProcess>> hbaseTag = new OutputTag<Tuple2<JSONObject, TableProcess>>("hbase") {
        };

        SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> kafkaStream = connectedStreams.process(new ProcessFunction<Tuple2<JSONObject, TableProcess>, Tuple2<JSONObject, TableProcess>>() {
            @Override
            public void processElement(Tuple2<JSONObject, TableProcess> value,
                                       Context ctx,
                                       Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception {
                //只取出JSONObject中的data數據,相當於做了一次過濾,我們只需要data數據
                Tuple2<JSONObject, TableProcess> data = Tuple2.of(value.f0.getJSONObject("data"), value.f1);
                //其實這個地方應該根據sink_cloumns的值進行一個過濾, 只保留需要sink的字段
                filterColumns(data);
                /**
                 * 從TableProcess配置中獲取每條數據應該去往哪里:getSink_type
                 * hbase
                 * kafka
                 * */
                String sink_type = value.f1.getSink_type();
                if (TableProcess.SINK_TYPE_KAFKA.equals(sink_type)) { //如果這條數據中的配置顯示是kafka,則將數據發往kafka(使用主流)
                    //事實數據較多,使用主流發往kafka
                    out.collect(data);
                } else if (TableProcess.SINK_TYPE_HBASE.equals(sink_type)) { //如果這條數據中的配置顯示是hbase,則將數據發往hbase(使用側輸出流)
                    //因為維度數據較少,故使用側輸出流發往hbase
                    ctx.output(hbaseTag, data);
                }
            }

            private void filterColumns(Tuple2<JSONObject, TableProcess> data) {
                JSONObject jsonObject = data.f0;
                //將配置表中的配個字段切分開來,放到一個List集合中
                /*
                id,activity_name,activity_type,activity_desc,start_time,end_time,create_time
                id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level
                */
                List<String> columns = Arrays.asList(data.f1.getSink_columns().split(","));
                //如果columns集合中沒有對應的key值,那么JSONObject中的這條數據就刪除它
                jsonObject.keySet().removeIf(key -> !columns.contains(key));
            }
        });
        //將側輸出流轉換為DataStream
        DataStream<Tuple2<JSONObject, TableProcess>> hbaseStream = kafkaStream.getSideOutput(hbaseTag);
        return Tuple2.of(kafkaStream,hbaseStream);
    }

    private SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectStreams(SingleOutputStreamOperator<JSONObject> etledStream,
                                                                                        SingleOutputStreamOperator<TableProcess> tableProcessStream) {
        MapStateDescriptor<String, TableProcess> tpStateDesc = new MapStateDescriptor<>("tpStateDesc", String.class, TableProcess.class);
        /*
            動態分流
                目標: 應該得到一個新的流, 新的流存儲的數據類型應該是一個二維元組
                <JSONObject, TableProcess>
            碰到一條數據流中的數據, 找一個TableProcess
            key: source_table:operate_type
            value: TableProcess
         */
        //1.將配置流做成廣播流
        BroadcastStream<TableProcess> tpBroadcastStream = tableProcessStream.broadcast(tpStateDesc);
        //2.廣播流與數據流進行connect
        return etledStream.keyBy(obj -> obj.getString("table"))    //以table分組,然后將每個table與廣播流connect
                .connect(tpBroadcastStream)
                .process(new KeyedBroadcastProcessFunction<String, JSONObject, TableProcess, Tuple2<JSONObject,TableProcess>>() {
                    @Override
                    public void processElement(JSONObject value,
                                               ReadOnlyContext ctx,
                                               Collector<Tuple2<JSONObject,TableProcess>> out) throws Exception {
                        //獲取廣播狀態,因為所有的配置信息都保存在了廣播狀態中
                        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(tpStateDesc);
                        //拼接table:type作為key值
                        String key = value.getString("table") + ":" + value.getString("type").replaceAll("bootstrap-", "");
                        //根據在數據流中拼接出來的key值,從廣播狀態中取出對應的配置信息(即給每一條數據找一個TableProcess配置)
                        TableProcess tableProcess = broadcastState.get(key);
                        //如果tableProcess是null,證明這條數據不需要后面處理
                        if (tableProcess != null){
                            out.collect(Tuple2.of(value,tableProcess)); //將Json數據和配置數據返回(一個table對應一個配置信息)
                        }

                    }

                    @Override
                    public void processBroadcastElement(TableProcess value,
                                                        Context ctx,
                                                        Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception {
                        //把來的每條配置都寫入到廣播狀態中
                        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(tpStateDesc);   //從上下文環境中獲取廣播狀態
                        //拼接key,以保存到廣播狀態中
                        /*
                        Source_table    Operate_type
                        activity_info    insert
                        activity_info    update
                        activity_rule    insert
                        activity_rule    update
                        activity_sku    insert
                        activity_sku    update
                        */
                        String key = value.getSource_table() + ":" + value.getOperate_type();
                        //一條記錄就是一個配置信息
                        broadcastState.put(key,value);
                    }
                });

    }

    private SingleOutputStreamOperator<TableProcess> readTableProcess(StreamExecutionEnvironment environment) {
        /**
         * 第一次讀取全部數據
         * 以后監控mysql中這個配置表的數據的更新
         * */
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        tableEnvironment.executeSql("create table `table_process` (\n" +
                " `source_table` string,\n" +
                " `operate_type` string,\n" +
                " `sink_type` string,\n" +
                " `sink_table` string,\n" +
                " `sink_columns` string,\n" +
                " `sink_pk` string,\n" +
                " `sink_extend` string,\n" +
                " primary key (`source_table`,`operate_type`) not enforced\n" +
                ")with(" +
                " 'connector' = 'mysql-cdc', " +
                " 'hostname' = 'hadoop162', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = 'aaaaaa', " +
                " 'database-name' = 'flink_realtime', " +
                " 'table-name' = 'table_process', " +
                " 'debezium.snapshot.mode' = 'initial' " +
                ")"
        );
        /**
         * initial: 啟動的時候會讀取表中所有的數據, 放在內存中, 全部數據讀取完成之后, 會使用binlog來監控mysql的變化
         * never: 只用binlog來監控mysql的變化
         */
        Table table_process = tableEnvironment.from("table_process");

        return tableEnvironment
                .toRetractStream(table_process, TableProcess.class) //將table轉化為可以新增和變化的dataStream
                .filter(t -> t.f0)      //過濾出變化的數據
                .map(t -> t.f1);        //返回數據:TableProcess
    }

    private SingleOutputStreamOperator<JSONObject> etlDataStream(DataStreamSource<String> dataStream) {
        return dataStream
                .map(JSON::parseObject)  //將流中的數據轉為JSON格式
                .filter(obj ->
                        obj.getString("database") != null
                        && obj.getString("table") != null
                        && obj.getString("type") != null
                        && (obj.getString("type").contains("insert") || "update".equals(obj.getString("type")))
                        && obj.getString("data") != null
                        && obj.getString("data").length() > 10
                );
    }
}

    (7)啟動Hbase

start-hbase.sh

    (8)進入Phoenix客戶端,查看表,發現目前啥也沒有

/opt/module/phoenix-5.0.0/bin/sqlline.py

    (9)啟動DwdDbApp,接收從kafka中讀取的數據以及使用Flink CDC讀取的配置表信息

    (10)使用maxwell-bootstrap將舊的數據導入kafka中(在此之前必須啟動maxwell)

/opt/module/maxwell-1.27.1/bin/maxwell --config /opt/module/maxwell-1.27.1/config.properties --daemon
/opt/module/maxwell-1.27.1/bin/maxwell-bootstrap --user maxwell --password aaaaaa --host hadoop162 --database flinkdb --table user_info --client_id maxwell_1

    (11)發現user_info表已經創建

     (12)再次使用maxwell-bootstrap,將activity_info以前的數據導入kafka,發現activity_info已經生成並且有數據

/opt/module/maxwell-1.27.1/bin/maxwell-bootstrap --user maxwell --password aaaaaa --host hadoop162 --database flinkdb --table activity_info --client_id maxwell_1

  2)Sink到Kafka

    (1)在FlinkSinkUtil 中添加getKafkaSink方法

public static FlinkKafkaProducer<Tuple2<JSONObject, TableProcess>> getKafkaSink(){
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","hadoop162:9092,hadoop163:9092,hadoop164:9092");
        properties.setProperty("transaction.timeout.ms", 15 * 60 * 1000 + "");
        return new FlinkKafkaProducer<Tuple2<JSONObject,TableProcess>>(
                "default",
                //kafka序列化器
                new KafkaSerializationSchema<Tuple2<JSONObject, TableProcess>>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(Tuple2<JSONObject, TableProcess> element,
                                                                    @Nullable Long aLong) {
                        return new ProducerRecord<>(
                                element.f1.getSink_table(), //每條數據對應一個配置表中的數據,將TableProcess配置表中的Sink_table作為kafka的主題
                                null,
                                element.f0.toJSONString().getBytes(StandardCharsets.UTF_8)  //將JSONObject事實數據作為value寫入kafka的topic中
                                );
                    }
                },
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE    //精准一次
        );
    }

    (2)在DwdDbApp中直接調用即可,完整的DwdDbApp代碼如下

package com.yuange.flinkrealtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yuange.flinkrealtime.app.BaseAppV1;
import com.yuange.flinkrealtime.bean.TableProcess;
import com.yuange.flinkrealtime.common.Constant;
import com.yuange.flinkrealtime.util.FlinkSinkUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.Arrays;
import java.util.List;

/**
 * @作者:袁哥
 * @時間:2021/7/29 11:33
 * 業務數據使用maxwell采集到kafka形成ods層的業務數據,然后再由Flink來接收處理ods層的數據
 * 配置數據使用FlinkCDC直接從MySQL中讀取,並且進行實時監控,只要配置表中的數據發生變動,FlinkCDC會感知到並進行相應的處理
 */
public class DwdDbApp extends BaseAppV1 {

    public static void main(String[] args) {
        new DwdDbApp().init(
                2002,               //端口號
                1,                    //並行度
                "DwdDbApp",          //檢查點存放在HDFS上的目錄名稱
                "DwdDbApp",     //消費者組
                Constant.TOPIC_ODS_DB    //消費的主題
        );
    }

    @Override
    protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> dataStream) {
        //1. 對數據進行etl
        SingleOutputStreamOperator<JSONObject> etledStream = etlDataStream(dataStream);
        //2. 讀取配置表的數據, 得到一個配置流(cdc)
        SingleOutputStreamOperator<TableProcess> tableProcessStream = readTableProcess(environment);
        //3. 數據流和配置流進行connect,返回值就是:一個JSONObject數據對應一個TableProcess配置
        SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectedStreams = connectStreams(etledStream, tableProcessStream);
        //4.每條數據根據他的配置, 進行動態分流
        Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> kafkaHbaseStreams = dynamicSplit(connectedStreams);
//        kafkaHbaseStreams.f0.print("kafka");
//        kafkaHbaseStreams.f1.print("hbase");
        //5.維度表寫入到hbase
        sendToHbase(kafkaHbaseStreams.f1);
        //6.事實表寫入到kafka
        sendToKafka(kafkaHbaseStreams.f0);
    }

    private void sendToKafka(SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> stream) {
        stream.addSink(FlinkSinkUtil.getKafkaSink());
    }

    private void sendToHbase(DataStream<Tuple2<JSONObject, TableProcess>> stream) {
        /**
         * 向hbase(Phoenix)寫入數據的時候, 表不會自動創建
         * 1. 先創建表 動態創建
         * 2. 再寫入
         * */
        stream.keyBy(t -> t.f1.getSink_table()) //按照配置表中的Sink_table分組
                .addSink(FlinkSinkUtil.getHbaseSink());
    }

    private Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> dynamicSplit(SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectedStreams) {
        //側輸出流
        OutputTag<Tuple2<JSONObject, TableProcess>> hbaseTag = new OutputTag<Tuple2<JSONObject, TableProcess>>("hbase") {
        };

        SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> kafkaStream = connectedStreams.process(new ProcessFunction<Tuple2<JSONObject, TableProcess>, Tuple2<JSONObject, TableProcess>>() {
            @Override
            public void processElement(Tuple2<JSONObject, TableProcess> value,
                                       Context ctx,
                                       Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception {
                //只取出JSONObject中的data數據,相當於做了一次過濾,我們只需要data數據
                Tuple2<JSONObject, TableProcess> data = Tuple2.of(value.f0.getJSONObject("data"), value.f1);
                //其實這個地方應該根據sink_cloumns的值進行一個過濾, 只保留需要sink的字段
                filterColumns(data);
                /**
                 * 從TableProcess配置中獲取每條數據應該去往哪里:getSink_type
                 * hbase
                 * kafka
                 * */
                String sink_type = value.f1.getSink_type();
                if (TableProcess.SINK_TYPE_KAFKA.equals(sink_type)) { //如果這條數據中的配置顯示是kafka,則將數據發往kafka(使用主流)
                    //事實數據較多,使用主流發往kafka
                    out.collect(data);
                } else if (TableProcess.SINK_TYPE_HBASE.equals(sink_type)) { //如果這條數據中的配置顯示是hbase,則將數據發往hbase(使用側輸出流)
                    //因為維度數據較少,故使用側輸出流發往hbase
                    ctx.output(hbaseTag, data);
                }
            }

            private void filterColumns(Tuple2<JSONObject, TableProcess> data) {
                JSONObject jsonObject = data.f0;
                //將配置表中的配個字段切分開來,放到一個List集合中
                /*
                id,activity_name,activity_type,activity_desc,start_time,end_time,create_time
                id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level
                */
                List<String> columns = Arrays.asList(data.f1.getSink_columns().split(","));
                //如果columns集合中沒有對應的key值,那么JSONObject中的這條數據就刪除它
                jsonObject.keySet().removeIf(key -> !columns.contains(key));
            }
        });
        //將側輸出流轉換為DataStream
        DataStream<Tuple2<JSONObject, TableProcess>> hbaseStream = kafkaStream.getSideOutput(hbaseTag);
        return Tuple2.of(kafkaStream,hbaseStream);
    }

    private SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectStreams(SingleOutputStreamOperator<JSONObject> etledStream,
                                                                                        SingleOutputStreamOperator<TableProcess> tableProcessStream) {
        MapStateDescriptor<String, TableProcess> tpStateDesc = new MapStateDescriptor<>("tpStateDesc", String.class, TableProcess.class);
        /*
            動態分流
                目標: 應該得到一個新的流, 新的流存儲的數據類型應該是一個二維元組
                <JSONObject, TableProcess>
            碰到一條數據流中的數據, 找一個TableProcess
            key: source_table:operate_type
            value: TableProcess
         */
        //1.將配置流做成廣播流
        BroadcastStream<TableProcess> tpBroadcastStream = tableProcessStream.broadcast(tpStateDesc);
        //2.廣播流與數據流進行connect
        return etledStream.keyBy(obj -> obj.getString("table"))    //以table分組,然后將每個table與廣播流connect
                .connect(tpBroadcastStream)
                .process(new KeyedBroadcastProcessFunction<String, JSONObject, TableProcess, Tuple2<JSONObject,TableProcess>>() {
                    @Override
                    public void processElement(JSONObject value,
                                               ReadOnlyContext ctx,
                                               Collector<Tuple2<JSONObject,TableProcess>> out) throws Exception {
                        //獲取廣播狀態,因為所有的配置信息都保存在了廣播狀態中
                        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(tpStateDesc);
                        //拼接table:type作為key值
                        String key = value.getString("table") + ":" + value.getString("type").replaceAll("bootstrap-", "");
                        //根據在數據流中拼接出來的key值,從廣播狀態中取出對應的配置信息(即給每一條數據找一個TableProcess配置)
                        TableProcess tableProcess = broadcastState.get(key);
                        //如果tableProcess是null,證明這條數據不需要后面處理
                        if (tableProcess != null){
                            out.collect(Tuple2.of(value,tableProcess)); //將Json數據和配置數據返回(一個table對應一個配置信息)
                        }

                    }

                    @Override
                    public void processBroadcastElement(TableProcess value,
                                                        Context ctx,
                                                        Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception {
                        //把來的每條配置都寫入到廣播狀態中
                        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(tpStateDesc);   //從上下文環境中獲取廣播狀態
                        //拼接key,以保存到廣播狀態中
                        /*
                        Source_table    Operate_type
                        activity_info    insert
                        activity_info    update
                        activity_rule    insert
                        activity_rule    update
                        activity_sku    insert
                        activity_sku    update
                        */
                        String key = value.getSource_table() + ":" + value.getOperate_type();
                        //一條記錄就是一個配置信息
                        broadcastState.put(key,value);
                    }
                });

    }

    private SingleOutputStreamOperator<TableProcess> readTableProcess(StreamExecutionEnvironment environment) {
        /**
         * 第一次讀取全部數據
         * 以后監控mysql中這個配置表的數據的更新
         * */
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        tableEnvironment.executeSql("create table `table_process` (\n" +
                " `source_table` string,\n" +
                " `operate_type` string,\n" +
                " `sink_type` string,\n" +
                " `sink_table` string,\n" +
                " `sink_columns` string,\n" +
                " `sink_pk` string,\n" +
                " `sink_extend` string,\n" +
                " primary key (`source_table`,`operate_type`) not enforced\n" +
                ")with(" +
                " 'connector' = 'mysql-cdc', " +
                " 'hostname' = 'hadoop162', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = 'aaaaaa', " +
                " 'database-name' = 'flink_realtime', " +
                " 'table-name' = 'table_process', " +
                " 'debezium.snapshot.mode' = 'initial' " +
                ")"
        );
        /**
         * initial: 啟動的時候會讀取表中所有的數據, 放在內存中, 全部數據讀取完成之后, 會使用binlog來監控mysql的變化
         * never: 只用binlog來監控mysql的變化
         */
        Table table_process = tableEnvironment.from("table_process");

        return tableEnvironment
                .toRetractStream(table_process, TableProcess.class) //將table轉化為可以新增和變化的dataStream
                .filter(t -> t.f0)      //過濾出變化的數據
                .map(t -> t.f1);        //返回數據:TableProcess
    }

    private SingleOutputStreamOperator<JSONObject> etlDataStream(DataStreamSource<String> dataStream) {
        return dataStream
                .map(JSON::parseObject)  //將流中的數據轉為JSON格式
                .filter(obj ->
                        obj.getString("database") != null
                        && obj.getString("table") != null
                        && obj.getString("type") != null
                        && (obj.getString("type").contains("insert") || "update".equals(obj.getString("type")))
                        && obj.getString("data") != null
                        && obj.getString("data").length() > 10
                );
    }
}

    (3)啟動DwdDbApp

    (4)生產業務數據,模擬新增

cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar

    (5)查看Kafka

  3)打包部署flink-realtime到Linux上

    (1)使用maven打包(打包之前先clear一下,並且停止idea上的DwdDbApp

    (2)將打包好的flink-realtime-1.0-SNAPSHOT.jar上傳至/opt/module/applog

    (3)啟動Yarn-Session(在此之前必須啟動Hadoop、ZK、Kafka、Hbase)

/opt/module/flink-yarn/bin/yarn-session.sh -d

    (4)提交運行flink-realtime-1.0-SNAPSHOT.jar到Yarn上,編寫一個腳本來提交,避免重復性工作

vim /home/atguigu/bin/realtime.sh
#!/bin/bash
flink=/opt/module/flink-yarn/bin/flink
jar=/opt/module/applog/flink-realtime-1.0-SNAPSHOT.jar

apps=(
        com.yuange.flinkrealtime.app.dwd.DwdLogApp
        com.yuange.flinkrealtime.app.dwd.DwdDbApp
)

for app in ${apps[*]} ; do
        $flink run -d -c $app $jar
done
chmod +x /home/atguigu/bin/realtime.sh
/home/atguigu/bin/realtime.sh

    (5)查看程序運行情況,發現出現異常信息

    (6)原因是依賴沖突,flink-realtime-1.0-SNAPSHOT.jar中的依賴和Linux中Flink的依賴存在相同的依賴,將其中一放的依賴去除即可

cd /opt/module/flink-yarn/lib
rm -rf flink-connector-jdbc_2.12-1.13.1.jar flink-connector-kafka_2.12-1.13.1.jar mysql-connector-java-5.1.27-bin.jar

    (7)重啟yarn-session,然后再次提交flink-realtime-1.0-SNAPSHOT.jar至yarn上運行

    (8)再次查看,發現沒有異常日志

    (9)將hbase中的表刪除

    (10)將kafka中的所有主題刪除

    (11)啟動日志服務器(測試日志能否到達Kafka,在此之前必須保證nginx已啟動)

log-lg.sh start

    (12)啟動maxwell,然后使用maxwell-bootstrap導入數據

vim /home/atguigu/bin/maxwell.sh
#!/bin/bash
maxwell_home=/opt/module/maxwell-1.27.1
case $1 in
    start)
        echo "========== $host 啟動maxwell ========="
        source /etc/profile
        $maxwell_home/bin/maxwell --config $maxwell_home/config.properties --daemon
       ;;

    stop)
        echo "========== $host停止 maxwell ========="
        source /etc/profile
        jps | awk '/Maxwell/ {print $1}' | xargs kill
        ;;

    *)
        echo "你啟動的姿勢不對"
        echo "  start   啟動maxwell集群"
        echo "  stop    停止maxwwll集群"

    ;;
chmod +x /home/atguigu/bin/maxwell.sh
maxwell.sh start
/opt/module/maxwell-1.27.1/bin/maxwell-bootstrap --user maxwell --password aaaaaa --host hadoop162 --database flinkdb --table user_info --client_id maxwell_1

    (13)查看Hbase是否有user_info表,且是否有數據

    (14)生產日志數據,查看Kafka中是否有數據

cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

    (15)生產業務數據,查看Kafka中是否有數據

cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM